python (3.12.0)
1 import collections
2 import enum
3 import warnings
4 try:
5 import ssl
6 except ImportError: # pragma: no cover
7 ssl = None
8
9 from . import constants
10 from . import exceptions
11 from . import protocols
12 from . import transports
13 from .log import logger
14
15 if ssl is not None:
16 SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
17
18
19 class ESC[4;38;5;81mSSLProtocolState(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
20 UNWRAPPED = "UNWRAPPED"
21 DO_HANDSHAKE = "DO_HANDSHAKE"
22 WRAPPED = "WRAPPED"
23 FLUSHING = "FLUSHING"
24 SHUTDOWN = "SHUTDOWN"
25
26
27 class ESC[4;38;5;81mAppProtocolState(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
28 # This tracks the state of app protocol (https://git.io/fj59P):
29 #
30 # INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST
31 #
32 # * cm: connection_made()
33 # * dr: data_received()
34 # * er: eof_received()
35 # * cl: connection_lost()
36
37 STATE_INIT = "STATE_INIT"
38 STATE_CON_MADE = "STATE_CON_MADE"
39 STATE_EOF = "STATE_EOF"
40 STATE_CON_LOST = "STATE_CON_LOST"
41
42
43 def _create_transport_context(server_side, server_hostname):
44 if server_side:
45 raise ValueError('Server side SSL needs a valid SSLContext')
46
47 # Client side may pass ssl=True to use a default
48 # context; in that case the sslcontext passed is None.
49 # The default is secure for client connections.
50 # Python 3.4+: use up-to-date strong settings.
51 sslcontext = ssl.create_default_context()
52 if not server_hostname:
53 sslcontext.check_hostname = False
54 return sslcontext
55
56
57 def add_flowcontrol_defaults(high, low, kb):
58 if high is None:
59 if low is None:
60 hi = kb * 1024
61 else:
62 lo = low
63 hi = 4 * lo
64 else:
65 hi = high
66 if low is None:
67 lo = hi // 4
68 else:
69 lo = low
70
71 if not hi >= lo >= 0:
72 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
73 (hi, lo))
74
75 return hi, lo
76
77
78 class ESC[4;38;5;81m_SSLProtocolTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
79 ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
80
81 _start_tls_compatible = True
82 _sendfile_compatible = constants._SendfileMode.FALLBACK
83
84 def __init__(self, loop, ssl_protocol):
85 self._loop = loop
86 self._ssl_protocol = ssl_protocol
87 self._closed = False
88
89 def get_extra_info(self, name, default=None):
90 """Get optional transport information."""
91 return self._ssl_protocol._get_extra_info(name, default)
92
93 def set_protocol(self, protocol):
94 self._ssl_protocol._set_app_protocol(protocol)
95
96 def get_protocol(self):
97 return self._ssl_protocol._app_protocol
98
99 def is_closing(self):
100 return self._closed
101
102 def close(self):
103 """Close the transport.
104
105 Buffered data will be flushed asynchronously. No more data
106 will be received. After all buffered data is flushed, the
107 protocol's connection_lost() method will (eventually) called
108 with None as its argument.
109 """
110 if not self._closed:
111 self._closed = True
112 self._ssl_protocol._start_shutdown()
113 else:
114 self._ssl_protocol = None
115
116 def __del__(self, _warnings=warnings):
117 if not self._closed:
118 self._closed = True
119 _warnings.warn(
120 "unclosed transport <asyncio._SSLProtocolTransport "
121 "object>", ResourceWarning)
122
123 def is_reading(self):
124 return not self._ssl_protocol._app_reading_paused
125
126 def pause_reading(self):
127 """Pause the receiving end.
128
129 No data will be passed to the protocol's data_received()
130 method until resume_reading() is called.
131 """
132 self._ssl_protocol._pause_reading()
133
134 def resume_reading(self):
135 """Resume the receiving end.
136
137 Data received will once again be passed to the protocol's
138 data_received() method.
139 """
140 self._ssl_protocol._resume_reading()
141
142 def set_write_buffer_limits(self, high=None, low=None):
143 """Set the high- and low-water limits for write flow control.
144
145 These two values control when to call the protocol's
146 pause_writing() and resume_writing() methods. If specified,
147 the low-water limit must be less than or equal to the
148 high-water limit. Neither value can be negative.
149
150 The defaults are implementation-specific. If only the
151 high-water limit is given, the low-water limit defaults to an
152 implementation-specific value less than or equal to the
153 high-water limit. Setting high to zero forces low to zero as
154 well, and causes pause_writing() to be called whenever the
155 buffer becomes non-empty. Setting low to zero causes
156 resume_writing() to be called only once the buffer is empty.
157 Use of zero for either limit is generally sub-optimal as it
158 reduces opportunities for doing I/O and computation
159 concurrently.
160 """
161 self._ssl_protocol._set_write_buffer_limits(high, low)
162 self._ssl_protocol._control_app_writing()
163
164 def get_write_buffer_limits(self):
165 return (self._ssl_protocol._outgoing_low_water,
166 self._ssl_protocol._outgoing_high_water)
167
168 def get_write_buffer_size(self):
169 """Return the current size of the write buffers."""
170 return self._ssl_protocol._get_write_buffer_size()
171
172 def set_read_buffer_limits(self, high=None, low=None):
173 """Set the high- and low-water limits for read flow control.
174
175 These two values control when to call the upstream transport's
176 pause_reading() and resume_reading() methods. If specified,
177 the low-water limit must be less than or equal to the
178 high-water limit. Neither value can be negative.
179
180 The defaults are implementation-specific. If only the
181 high-water limit is given, the low-water limit defaults to an
182 implementation-specific value less than or equal to the
183 high-water limit. Setting high to zero forces low to zero as
184 well, and causes pause_reading() to be called whenever the
185 buffer becomes non-empty. Setting low to zero causes
186 resume_reading() to be called only once the buffer is empty.
187 Use of zero for either limit is generally sub-optimal as it
188 reduces opportunities for doing I/O and computation
189 concurrently.
190 """
191 self._ssl_protocol._set_read_buffer_limits(high, low)
192 self._ssl_protocol._control_ssl_reading()
193
194 def get_read_buffer_limits(self):
195 return (self._ssl_protocol._incoming_low_water,
196 self._ssl_protocol._incoming_high_water)
197
198 def get_read_buffer_size(self):
199 """Return the current size of the read buffer."""
200 return self._ssl_protocol._get_read_buffer_size()
201
202 @property
203 def _protocol_paused(self):
204 # Required for sendfile fallback pause_writing/resume_writing logic
205 return self._ssl_protocol._app_writing_paused
206
207 def write(self, data):
208 """Write some data bytes to the transport.
209
210 This does not block; it buffers the data and arranges for it
211 to be sent out asynchronously.
212 """
213 if not isinstance(data, (bytes, bytearray, memoryview)):
214 raise TypeError(f"data: expecting a bytes-like instance, "
215 f"got {type(data).__name__}")
216 if not data:
217 return
218 self._ssl_protocol._write_appdata((data,))
219
220 def writelines(self, list_of_data):
221 """Write a list (or any iterable) of data bytes to the transport.
222
223 The default implementation concatenates the arguments and
224 calls write() on the result.
225 """
226 self._ssl_protocol._write_appdata(list_of_data)
227
228 def write_eof(self):
229 """Close the write end after flushing buffered data.
230
231 This raises :exc:`NotImplementedError` right now.
232 """
233 raise NotImplementedError
234
235 def can_write_eof(self):
236 """Return True if this transport supports write_eof(), False if not."""
237 return False
238
239 def abort(self):
240 """Close the transport immediately.
241
242 Buffered data will be lost. No more data will be received.
243 The protocol's connection_lost() method will (eventually) be
244 called with None as its argument.
245 """
246 self._closed = True
247 if self._ssl_protocol is not None:
248 self._ssl_protocol._abort()
249
250 def _force_close(self, exc):
251 self._closed = True
252 self._ssl_protocol._abort(exc)
253
254 def _test__append_write_backlog(self, data):
255 # for test only
256 self._ssl_protocol._write_backlog.append(data)
257 self._ssl_protocol._write_buffer_size += len(data)
258
259
260 class ESC[4;38;5;81mSSLProtocol(ESC[4;38;5;149mprotocolsESC[4;38;5;149m.ESC[4;38;5;149mBufferedProtocol):
261 max_size = 256 * 1024 # Buffer size passed to read()
262
263 _handshake_start_time = None
264 _handshake_timeout_handle = None
265 _shutdown_timeout_handle = None
266
267 def __init__(self, loop, app_protocol, sslcontext, waiter,
268 server_side=False, server_hostname=None,
269 call_connection_made=True,
270 ssl_handshake_timeout=None,
271 ssl_shutdown_timeout=None):
272 if ssl is None:
273 raise RuntimeError("stdlib ssl module not available")
274
275 self._ssl_buffer = bytearray(self.max_size)
276 self._ssl_buffer_view = memoryview(self._ssl_buffer)
277
278 if ssl_handshake_timeout is None:
279 ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT
280 elif ssl_handshake_timeout <= 0:
281 raise ValueError(
282 f"ssl_handshake_timeout should be a positive number, "
283 f"got {ssl_handshake_timeout}")
284 if ssl_shutdown_timeout is None:
285 ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT
286 elif ssl_shutdown_timeout <= 0:
287 raise ValueError(
288 f"ssl_shutdown_timeout should be a positive number, "
289 f"got {ssl_shutdown_timeout}")
290
291 if not sslcontext:
292 sslcontext = _create_transport_context(
293 server_side, server_hostname)
294
295 self._server_side = server_side
296 if server_hostname and not server_side:
297 self._server_hostname = server_hostname
298 else:
299 self._server_hostname = None
300 self._sslcontext = sslcontext
301 # SSL-specific extra info. More info are set when the handshake
302 # completes.
303 self._extra = dict(sslcontext=sslcontext)
304
305 # App data write buffering
306 self._write_backlog = collections.deque()
307 self._write_buffer_size = 0
308
309 self._waiter = waiter
310 self._loop = loop
311 self._set_app_protocol(app_protocol)
312 self._app_transport = None
313 self._app_transport_created = False
314 # transport, ex: SelectorSocketTransport
315 self._transport = None
316 self._ssl_handshake_timeout = ssl_handshake_timeout
317 self._ssl_shutdown_timeout = ssl_shutdown_timeout
318 # SSL and state machine
319 self._incoming = ssl.MemoryBIO()
320 self._outgoing = ssl.MemoryBIO()
321 self._state = SSLProtocolState.UNWRAPPED
322 self._conn_lost = 0 # Set when connection_lost called
323 if call_connection_made:
324 self._app_state = AppProtocolState.STATE_INIT
325 else:
326 self._app_state = AppProtocolState.STATE_CON_MADE
327 self._sslobj = self._sslcontext.wrap_bio(
328 self._incoming, self._outgoing,
329 server_side=self._server_side,
330 server_hostname=self._server_hostname)
331
332 # Flow Control
333
334 self._ssl_writing_paused = False
335
336 self._app_reading_paused = False
337
338 self._ssl_reading_paused = False
339 self._incoming_high_water = 0
340 self._incoming_low_water = 0
341 self._set_read_buffer_limits()
342 self._eof_received = False
343
344 self._app_writing_paused = False
345 self._outgoing_high_water = 0
346 self._outgoing_low_water = 0
347 self._set_write_buffer_limits()
348 self._get_app_transport()
349
350 def _set_app_protocol(self, app_protocol):
351 self._app_protocol = app_protocol
352 # Make fast hasattr check first
353 if (hasattr(app_protocol, 'get_buffer') and
354 isinstance(app_protocol, protocols.BufferedProtocol)):
355 self._app_protocol_get_buffer = app_protocol.get_buffer
356 self._app_protocol_buffer_updated = app_protocol.buffer_updated
357 self._app_protocol_is_buffer = True
358 else:
359 self._app_protocol_is_buffer = False
360
361 def _wakeup_waiter(self, exc=None):
362 if self._waiter is None:
363 return
364 if not self._waiter.cancelled():
365 if exc is not None:
366 self._waiter.set_exception(exc)
367 else:
368 self._waiter.set_result(None)
369 self._waiter = None
370
371 def _get_app_transport(self):
372 if self._app_transport is None:
373 if self._app_transport_created:
374 raise RuntimeError('Creating _SSLProtocolTransport twice')
375 self._app_transport = _SSLProtocolTransport(self._loop, self)
376 self._app_transport_created = True
377 return self._app_transport
378
379 def connection_made(self, transport):
380 """Called when the low-level connection is made.
381
382 Start the SSL handshake.
383 """
384 self._transport = transport
385 self._start_handshake()
386
387 def connection_lost(self, exc):
388 """Called when the low-level connection is lost or closed.
389
390 The argument is an exception object or None (the latter
391 meaning a regular EOF is received or the connection was
392 aborted or closed).
393 """
394 self._write_backlog.clear()
395 self._outgoing.read()
396 self._conn_lost += 1
397
398 # Just mark the app transport as closed so that its __dealloc__
399 # doesn't complain.
400 if self._app_transport is not None:
401 self._app_transport._closed = True
402
403 if self._state != SSLProtocolState.DO_HANDSHAKE:
404 if (
405 self._app_state == AppProtocolState.STATE_CON_MADE or
406 self._app_state == AppProtocolState.STATE_EOF
407 ):
408 self._app_state = AppProtocolState.STATE_CON_LOST
409 self._loop.call_soon(self._app_protocol.connection_lost, exc)
410 self._set_state(SSLProtocolState.UNWRAPPED)
411 self._transport = None
412 self._app_transport = None
413 self._app_protocol = None
414 self._wakeup_waiter(exc)
415
416 if self._shutdown_timeout_handle:
417 self._shutdown_timeout_handle.cancel()
418 self._shutdown_timeout_handle = None
419 if self._handshake_timeout_handle:
420 self._handshake_timeout_handle.cancel()
421 self._handshake_timeout_handle = None
422
423 def get_buffer(self, n):
424 want = n
425 if want <= 0 or want > self.max_size:
426 want = self.max_size
427 if len(self._ssl_buffer) < want:
428 self._ssl_buffer = bytearray(want)
429 self._ssl_buffer_view = memoryview(self._ssl_buffer)
430 return self._ssl_buffer_view
431
432 def buffer_updated(self, nbytes):
433 self._incoming.write(self._ssl_buffer_view[:nbytes])
434
435 if self._state == SSLProtocolState.DO_HANDSHAKE:
436 self._do_handshake()
437
438 elif self._state == SSLProtocolState.WRAPPED:
439 self._do_read()
440
441 elif self._state == SSLProtocolState.FLUSHING:
442 self._do_flush()
443
444 elif self._state == SSLProtocolState.SHUTDOWN:
445 self._do_shutdown()
446
447 def eof_received(self):
448 """Called when the other end of the low-level stream
449 is half-closed.
450
451 If this returns a false value (including None), the transport
452 will close itself. If it returns a true value, closing the
453 transport is up to the protocol.
454 """
455 self._eof_received = True
456 try:
457 if self._loop.get_debug():
458 logger.debug("%r received EOF", self)
459
460 if self._state == SSLProtocolState.DO_HANDSHAKE:
461 self._on_handshake_complete(ConnectionResetError)
462
463 elif self._state == SSLProtocolState.WRAPPED:
464 self._set_state(SSLProtocolState.FLUSHING)
465 if self._app_reading_paused:
466 return True
467 else:
468 self._do_flush()
469
470 elif self._state == SSLProtocolState.FLUSHING:
471 self._do_write()
472 self._set_state(SSLProtocolState.SHUTDOWN)
473 self._do_shutdown()
474
475 elif self._state == SSLProtocolState.SHUTDOWN:
476 self._do_shutdown()
477
478 except Exception:
479 self._transport.close()
480 raise
481
482 def _get_extra_info(self, name, default=None):
483 if name in self._extra:
484 return self._extra[name]
485 elif self._transport is not None:
486 return self._transport.get_extra_info(name, default)
487 else:
488 return default
489
490 def _set_state(self, new_state):
491 allowed = False
492
493 if new_state == SSLProtocolState.UNWRAPPED:
494 allowed = True
495
496 elif (
497 self._state == SSLProtocolState.UNWRAPPED and
498 new_state == SSLProtocolState.DO_HANDSHAKE
499 ):
500 allowed = True
501
502 elif (
503 self._state == SSLProtocolState.DO_HANDSHAKE and
504 new_state == SSLProtocolState.WRAPPED
505 ):
506 allowed = True
507
508 elif (
509 self._state == SSLProtocolState.WRAPPED and
510 new_state == SSLProtocolState.FLUSHING
511 ):
512 allowed = True
513
514 elif (
515 self._state == SSLProtocolState.FLUSHING and
516 new_state == SSLProtocolState.SHUTDOWN
517 ):
518 allowed = True
519
520 if allowed:
521 self._state = new_state
522
523 else:
524 raise RuntimeError(
525 'cannot switch state from {} to {}'.format(
526 self._state, new_state))
527
528 # Handshake flow
529
530 def _start_handshake(self):
531 if self._loop.get_debug():
532 logger.debug("%r starts SSL handshake", self)
533 self._handshake_start_time = self._loop.time()
534 else:
535 self._handshake_start_time = None
536
537 self._set_state(SSLProtocolState.DO_HANDSHAKE)
538
539 # start handshake timeout count down
540 self._handshake_timeout_handle = \
541 self._loop.call_later(self._ssl_handshake_timeout,
542 lambda: self._check_handshake_timeout())
543
544 self._do_handshake()
545
546 def _check_handshake_timeout(self):
547 if self._state == SSLProtocolState.DO_HANDSHAKE:
548 msg = (
549 f"SSL handshake is taking longer than "
550 f"{self._ssl_handshake_timeout} seconds: "
551 f"aborting the connection"
552 )
553 self._fatal_error(ConnectionAbortedError(msg))
554
555 def _do_handshake(self):
556 try:
557 self._sslobj.do_handshake()
558 except SSLAgainErrors:
559 self._process_outgoing()
560 except ssl.SSLError as exc:
561 self._on_handshake_complete(exc)
562 else:
563 self._on_handshake_complete(None)
564
565 def _on_handshake_complete(self, handshake_exc):
566 if self._handshake_timeout_handle is not None:
567 self._handshake_timeout_handle.cancel()
568 self._handshake_timeout_handle = None
569
570 sslobj = self._sslobj
571 try:
572 if handshake_exc is None:
573 self._set_state(SSLProtocolState.WRAPPED)
574 else:
575 raise handshake_exc
576
577 peercert = sslobj.getpeercert()
578 except Exception as exc:
579 self._set_state(SSLProtocolState.UNWRAPPED)
580 if isinstance(exc, ssl.CertificateError):
581 msg = 'SSL handshake failed on verifying the certificate'
582 else:
583 msg = 'SSL handshake failed'
584 self._fatal_error(exc, msg)
585 self._wakeup_waiter(exc)
586 return
587
588 if self._loop.get_debug():
589 dt = self._loop.time() - self._handshake_start_time
590 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
591
592 # Add extra info that becomes available after handshake.
593 self._extra.update(peercert=peercert,
594 cipher=sslobj.cipher(),
595 compression=sslobj.compression(),
596 ssl_object=sslobj)
597 if self._app_state == AppProtocolState.STATE_INIT:
598 self._app_state = AppProtocolState.STATE_CON_MADE
599 self._app_protocol.connection_made(self._get_app_transport())
600 self._wakeup_waiter()
601 self._do_read()
602
603 # Shutdown flow
604
605 def _start_shutdown(self):
606 if (
607 self._state in (
608 SSLProtocolState.FLUSHING,
609 SSLProtocolState.SHUTDOWN,
610 SSLProtocolState.UNWRAPPED
611 )
612 ):
613 return
614 if self._app_transport is not None:
615 self._app_transport._closed = True
616 if self._state == SSLProtocolState.DO_HANDSHAKE:
617 self._abort()
618 else:
619 self._set_state(SSLProtocolState.FLUSHING)
620 self._shutdown_timeout_handle = self._loop.call_later(
621 self._ssl_shutdown_timeout,
622 lambda: self._check_shutdown_timeout()
623 )
624 self._do_flush()
625
626 def _check_shutdown_timeout(self):
627 if (
628 self._state in (
629 SSLProtocolState.FLUSHING,
630 SSLProtocolState.SHUTDOWN
631 )
632 ):
633 self._transport._force_close(
634 exceptions.TimeoutError('SSL shutdown timed out'))
635
636 def _do_flush(self):
637 self._do_read()
638 self._set_state(SSLProtocolState.SHUTDOWN)
639 self._do_shutdown()
640
641 def _do_shutdown(self):
642 try:
643 if not self._eof_received:
644 self._sslobj.unwrap()
645 except SSLAgainErrors:
646 self._process_outgoing()
647 except ssl.SSLError as exc:
648 self._on_shutdown_complete(exc)
649 else:
650 self._process_outgoing()
651 self._call_eof_received()
652 self._on_shutdown_complete(None)
653
654 def _on_shutdown_complete(self, shutdown_exc):
655 if self._shutdown_timeout_handle is not None:
656 self._shutdown_timeout_handle.cancel()
657 self._shutdown_timeout_handle = None
658
659 if shutdown_exc:
660 self._fatal_error(shutdown_exc)
661 else:
662 self._loop.call_soon(self._transport.close)
663
664 def _abort(self):
665 self._set_state(SSLProtocolState.UNWRAPPED)
666 if self._transport is not None:
667 self._transport.abort()
668
669 # Outgoing flow
670
671 def _write_appdata(self, list_of_data):
672 if (
673 self._state in (
674 SSLProtocolState.FLUSHING,
675 SSLProtocolState.SHUTDOWN,
676 SSLProtocolState.UNWRAPPED
677 )
678 ):
679 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
680 logger.warning('SSL connection is closed')
681 self._conn_lost += 1
682 return
683
684 for data in list_of_data:
685 self._write_backlog.append(data)
686 self._write_buffer_size += len(data)
687
688 try:
689 if self._state == SSLProtocolState.WRAPPED:
690 self._do_write()
691
692 except Exception as ex:
693 self._fatal_error(ex, 'Fatal error on SSL protocol')
694
695 def _do_write(self):
696 try:
697 while self._write_backlog:
698 data = self._write_backlog[0]
699 count = self._sslobj.write(data)
700 data_len = len(data)
701 if count < data_len:
702 self._write_backlog[0] = data[count:]
703 self._write_buffer_size -= count
704 else:
705 del self._write_backlog[0]
706 self._write_buffer_size -= data_len
707 except SSLAgainErrors:
708 pass
709 self._process_outgoing()
710
711 def _process_outgoing(self):
712 if not self._ssl_writing_paused:
713 data = self._outgoing.read()
714 if len(data):
715 self._transport.write(data)
716 self._control_app_writing()
717
718 # Incoming flow
719
720 def _do_read(self):
721 if (
722 self._state not in (
723 SSLProtocolState.WRAPPED,
724 SSLProtocolState.FLUSHING,
725 )
726 ):
727 return
728 try:
729 if not self._app_reading_paused:
730 if self._app_protocol_is_buffer:
731 self._do_read__buffered()
732 else:
733 self._do_read__copied()
734 if self._write_backlog:
735 self._do_write()
736 else:
737 self._process_outgoing()
738 self._control_ssl_reading()
739 except Exception as ex:
740 self._fatal_error(ex, 'Fatal error on SSL protocol')
741
742 def _do_read__buffered(self):
743 offset = 0
744 count = 1
745
746 buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
747 wants = len(buf)
748
749 try:
750 count = self._sslobj.read(wants, buf)
751
752 if count > 0:
753 offset = count
754 while offset < wants:
755 count = self._sslobj.read(wants - offset, buf[offset:])
756 if count > 0:
757 offset += count
758 else:
759 break
760 else:
761 self._loop.call_soon(lambda: self._do_read())
762 except SSLAgainErrors:
763 pass
764 if offset > 0:
765 self._app_protocol_buffer_updated(offset)
766 if not count:
767 # close_notify
768 self._call_eof_received()
769 self._start_shutdown()
770
771 def _do_read__copied(self):
772 chunk = b'1'
773 zero = True
774 one = False
775
776 try:
777 while True:
778 chunk = self._sslobj.read(self.max_size)
779 if not chunk:
780 break
781 if zero:
782 zero = False
783 one = True
784 first = chunk
785 elif one:
786 one = False
787 data = [first, chunk]
788 else:
789 data.append(chunk)
790 except SSLAgainErrors:
791 pass
792 if one:
793 self._app_protocol.data_received(first)
794 elif not zero:
795 self._app_protocol.data_received(b''.join(data))
796 if not chunk:
797 # close_notify
798 self._call_eof_received()
799 self._start_shutdown()
800
801 def _call_eof_received(self):
802 try:
803 if self._app_state == AppProtocolState.STATE_CON_MADE:
804 self._app_state = AppProtocolState.STATE_EOF
805 keep_open = self._app_protocol.eof_received()
806 if keep_open:
807 logger.warning('returning true from eof_received() '
808 'has no effect when using ssl')
809 except (KeyboardInterrupt, SystemExit):
810 raise
811 except BaseException as ex:
812 self._fatal_error(ex, 'Error calling eof_received()')
813
814 # Flow control for writes from APP socket
815
816 def _control_app_writing(self):
817 size = self._get_write_buffer_size()
818 if size >= self._outgoing_high_water and not self._app_writing_paused:
819 self._app_writing_paused = True
820 try:
821 self._app_protocol.pause_writing()
822 except (KeyboardInterrupt, SystemExit):
823 raise
824 except BaseException as exc:
825 self._loop.call_exception_handler({
826 'message': 'protocol.pause_writing() failed',
827 'exception': exc,
828 'transport': self._app_transport,
829 'protocol': self,
830 })
831 elif size <= self._outgoing_low_water and self._app_writing_paused:
832 self._app_writing_paused = False
833 try:
834 self._app_protocol.resume_writing()
835 except (KeyboardInterrupt, SystemExit):
836 raise
837 except BaseException as exc:
838 self._loop.call_exception_handler({
839 'message': 'protocol.resume_writing() failed',
840 'exception': exc,
841 'transport': self._app_transport,
842 'protocol': self,
843 })
844
845 def _get_write_buffer_size(self):
846 return self._outgoing.pending + self._write_buffer_size
847
848 def _set_write_buffer_limits(self, high=None, low=None):
849 high, low = add_flowcontrol_defaults(
850 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
851 self._outgoing_high_water = high
852 self._outgoing_low_water = low
853
854 # Flow control for reads to APP socket
855
856 def _pause_reading(self):
857 self._app_reading_paused = True
858
859 def _resume_reading(self):
860 if self._app_reading_paused:
861 self._app_reading_paused = False
862
863 def resume():
864 if self._state == SSLProtocolState.WRAPPED:
865 self._do_read()
866 elif self._state == SSLProtocolState.FLUSHING:
867 self._do_flush()
868 elif self._state == SSLProtocolState.SHUTDOWN:
869 self._do_shutdown()
870 self._loop.call_soon(resume)
871
872 # Flow control for reads from SSL socket
873
874 def _control_ssl_reading(self):
875 size = self._get_read_buffer_size()
876 if size >= self._incoming_high_water and not self._ssl_reading_paused:
877 self._ssl_reading_paused = True
878 self._transport.pause_reading()
879 elif size <= self._incoming_low_water and self._ssl_reading_paused:
880 self._ssl_reading_paused = False
881 self._transport.resume_reading()
882
883 def _set_read_buffer_limits(self, high=None, low=None):
884 high, low = add_flowcontrol_defaults(
885 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ)
886 self._incoming_high_water = high
887 self._incoming_low_water = low
888
889 def _get_read_buffer_size(self):
890 return self._incoming.pending
891
892 # Flow control for writes to SSL socket
893
894 def pause_writing(self):
895 """Called when the low-level transport's buffer goes over
896 the high-water mark.
897 """
898 assert not self._ssl_writing_paused
899 self._ssl_writing_paused = True
900
901 def resume_writing(self):
902 """Called when the low-level transport's buffer drains below
903 the low-water mark.
904 """
905 assert self._ssl_writing_paused
906 self._ssl_writing_paused = False
907 self._process_outgoing()
908
909 def _fatal_error(self, exc, message='Fatal error on transport'):
910 if self._transport:
911 self._transport._force_close(exc)
912
913 if isinstance(exc, OSError):
914 if self._loop.get_debug():
915 logger.debug("%r: %s", self, message, exc_info=True)
916 elif not isinstance(exc, exceptions.CancelledError):
917 self._loop.call_exception_handler({
918 'message': message,
919 'exception': exc,
920 'transport': self._transport,
921 'protocol': self,
922 })