(root)/
Python-3.12.0/
Lib/
asyncio/
sslproto.py
       1  import collections
       2  import enum
       3  import warnings
       4  try:
       5      import ssl
       6  except ImportError:  # pragma: no cover
       7      ssl = None
       8  
       9  from . import constants
      10  from . import exceptions
      11  from . import protocols
      12  from . import transports
      13  from .log import logger
      14  
      15  if ssl is not None:
      16      SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
      17  
      18  
      19  class ESC[4;38;5;81mSSLProtocolState(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
      20      UNWRAPPED = "UNWRAPPED"
      21      DO_HANDSHAKE = "DO_HANDSHAKE"
      22      WRAPPED = "WRAPPED"
      23      FLUSHING = "FLUSHING"
      24      SHUTDOWN = "SHUTDOWN"
      25  
      26  
      27  class ESC[4;38;5;81mAppProtocolState(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
      28      # This tracks the state of app protocol (https://git.io/fj59P):
      29      #
      30      #     INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST
      31      #
      32      # * cm: connection_made()
      33      # * dr: data_received()
      34      # * er: eof_received()
      35      # * cl: connection_lost()
      36  
      37      STATE_INIT = "STATE_INIT"
      38      STATE_CON_MADE = "STATE_CON_MADE"
      39      STATE_EOF = "STATE_EOF"
      40      STATE_CON_LOST = "STATE_CON_LOST"
      41  
      42  
      43  def _create_transport_context(server_side, server_hostname):
      44      if server_side:
      45          raise ValueError('Server side SSL needs a valid SSLContext')
      46  
      47      # Client side may pass ssl=True to use a default
      48      # context; in that case the sslcontext passed is None.
      49      # The default is secure for client connections.
      50      # Python 3.4+: use up-to-date strong settings.
      51      sslcontext = ssl.create_default_context()
      52      if not server_hostname:
      53          sslcontext.check_hostname = False
      54      return sslcontext
      55  
      56  
      57  def add_flowcontrol_defaults(high, low, kb):
      58      if high is None:
      59          if low is None:
      60              hi = kb * 1024
      61          else:
      62              lo = low
      63              hi = 4 * lo
      64      else:
      65          hi = high
      66      if low is None:
      67          lo = hi // 4
      68      else:
      69          lo = low
      70  
      71      if not hi >= lo >= 0:
      72          raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
      73                           (hi, lo))
      74  
      75      return hi, lo
      76  
      77  
      78  class ESC[4;38;5;81m_SSLProtocolTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
      79                              ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
      80  
      81      _start_tls_compatible = True
      82      _sendfile_compatible = constants._SendfileMode.FALLBACK
      83  
      84      def __init__(self, loop, ssl_protocol):
      85          self._loop = loop
      86          self._ssl_protocol = ssl_protocol
      87          self._closed = False
      88  
      89      def get_extra_info(self, name, default=None):
      90          """Get optional transport information."""
      91          return self._ssl_protocol._get_extra_info(name, default)
      92  
      93      def set_protocol(self, protocol):
      94          self._ssl_protocol._set_app_protocol(protocol)
      95  
      96      def get_protocol(self):
      97          return self._ssl_protocol._app_protocol
      98  
      99      def is_closing(self):
     100          return self._closed
     101  
     102      def close(self):
     103          """Close the transport.
     104  
     105          Buffered data will be flushed asynchronously.  No more data
     106          will be received.  After all buffered data is flushed, the
     107          protocol's connection_lost() method will (eventually) called
     108          with None as its argument.
     109          """
     110          if not self._closed:
     111              self._closed = True
     112              self._ssl_protocol._start_shutdown()
     113          else:
     114              self._ssl_protocol = None
     115  
     116      def __del__(self, _warnings=warnings):
     117          if not self._closed:
     118              self._closed = True
     119              _warnings.warn(
     120                  "unclosed transport <asyncio._SSLProtocolTransport "
     121                  "object>", ResourceWarning)
     122  
     123      def is_reading(self):
     124          return not self._ssl_protocol._app_reading_paused
     125  
     126      def pause_reading(self):
     127          """Pause the receiving end.
     128  
     129          No data will be passed to the protocol's data_received()
     130          method until resume_reading() is called.
     131          """
     132          self._ssl_protocol._pause_reading()
     133  
     134      def resume_reading(self):
     135          """Resume the receiving end.
     136  
     137          Data received will once again be passed to the protocol's
     138          data_received() method.
     139          """
     140          self._ssl_protocol._resume_reading()
     141  
     142      def set_write_buffer_limits(self, high=None, low=None):
     143          """Set the high- and low-water limits for write flow control.
     144  
     145          These two values control when to call the protocol's
     146          pause_writing() and resume_writing() methods.  If specified,
     147          the low-water limit must be less than or equal to the
     148          high-water limit.  Neither value can be negative.
     149  
     150          The defaults are implementation-specific.  If only the
     151          high-water limit is given, the low-water limit defaults to an
     152          implementation-specific value less than or equal to the
     153          high-water limit.  Setting high to zero forces low to zero as
     154          well, and causes pause_writing() to be called whenever the
     155          buffer becomes non-empty.  Setting low to zero causes
     156          resume_writing() to be called only once the buffer is empty.
     157          Use of zero for either limit is generally sub-optimal as it
     158          reduces opportunities for doing I/O and computation
     159          concurrently.
     160          """
     161          self._ssl_protocol._set_write_buffer_limits(high, low)
     162          self._ssl_protocol._control_app_writing()
     163  
     164      def get_write_buffer_limits(self):
     165          return (self._ssl_protocol._outgoing_low_water,
     166                  self._ssl_protocol._outgoing_high_water)
     167  
     168      def get_write_buffer_size(self):
     169          """Return the current size of the write buffers."""
     170          return self._ssl_protocol._get_write_buffer_size()
     171  
     172      def set_read_buffer_limits(self, high=None, low=None):
     173          """Set the high- and low-water limits for read flow control.
     174  
     175          These two values control when to call the upstream transport's
     176          pause_reading() and resume_reading() methods.  If specified,
     177          the low-water limit must be less than or equal to the
     178          high-water limit.  Neither value can be negative.
     179  
     180          The defaults are implementation-specific.  If only the
     181          high-water limit is given, the low-water limit defaults to an
     182          implementation-specific value less than or equal to the
     183          high-water limit.  Setting high to zero forces low to zero as
     184          well, and causes pause_reading() to be called whenever the
     185          buffer becomes non-empty.  Setting low to zero causes
     186          resume_reading() to be called only once the buffer is empty.
     187          Use of zero for either limit is generally sub-optimal as it
     188          reduces opportunities for doing I/O and computation
     189          concurrently.
     190          """
     191          self._ssl_protocol._set_read_buffer_limits(high, low)
     192          self._ssl_protocol._control_ssl_reading()
     193  
     194      def get_read_buffer_limits(self):
     195          return (self._ssl_protocol._incoming_low_water,
     196                  self._ssl_protocol._incoming_high_water)
     197  
     198      def get_read_buffer_size(self):
     199          """Return the current size of the read buffer."""
     200          return self._ssl_protocol._get_read_buffer_size()
     201  
     202      @property
     203      def _protocol_paused(self):
     204          # Required for sendfile fallback pause_writing/resume_writing logic
     205          return self._ssl_protocol._app_writing_paused
     206  
     207      def write(self, data):
     208          """Write some data bytes to the transport.
     209  
     210          This does not block; it buffers the data and arranges for it
     211          to be sent out asynchronously.
     212          """
     213          if not isinstance(data, (bytes, bytearray, memoryview)):
     214              raise TypeError(f"data: expecting a bytes-like instance, "
     215                              f"got {type(data).__name__}")
     216          if not data:
     217              return
     218          self._ssl_protocol._write_appdata((data,))
     219  
     220      def writelines(self, list_of_data):
     221          """Write a list (or any iterable) of data bytes to the transport.
     222  
     223          The default implementation concatenates the arguments and
     224          calls write() on the result.
     225          """
     226          self._ssl_protocol._write_appdata(list_of_data)
     227  
     228      def write_eof(self):
     229          """Close the write end after flushing buffered data.
     230  
     231          This raises :exc:`NotImplementedError` right now.
     232          """
     233          raise NotImplementedError
     234  
     235      def can_write_eof(self):
     236          """Return True if this transport supports write_eof(), False if not."""
     237          return False
     238  
     239      def abort(self):
     240          """Close the transport immediately.
     241  
     242          Buffered data will be lost.  No more data will be received.
     243          The protocol's connection_lost() method will (eventually) be
     244          called with None as its argument.
     245          """
     246          self._closed = True
     247          if self._ssl_protocol is not None:
     248              self._ssl_protocol._abort()
     249  
     250      def _force_close(self, exc):
     251          self._closed = True
     252          self._ssl_protocol._abort(exc)
     253  
     254      def _test__append_write_backlog(self, data):
     255          # for test only
     256          self._ssl_protocol._write_backlog.append(data)
     257          self._ssl_protocol._write_buffer_size += len(data)
     258  
     259  
     260  class ESC[4;38;5;81mSSLProtocol(ESC[4;38;5;149mprotocolsESC[4;38;5;149m.ESC[4;38;5;149mBufferedProtocol):
     261      max_size = 256 * 1024   # Buffer size passed to read()
     262  
     263      _handshake_start_time = None
     264      _handshake_timeout_handle = None
     265      _shutdown_timeout_handle = None
     266  
     267      def __init__(self, loop, app_protocol, sslcontext, waiter,
     268                   server_side=False, server_hostname=None,
     269                   call_connection_made=True,
     270                   ssl_handshake_timeout=None,
     271                   ssl_shutdown_timeout=None):
     272          if ssl is None:
     273              raise RuntimeError("stdlib ssl module not available")
     274  
     275          self._ssl_buffer = bytearray(self.max_size)
     276          self._ssl_buffer_view = memoryview(self._ssl_buffer)
     277  
     278          if ssl_handshake_timeout is None:
     279              ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT
     280          elif ssl_handshake_timeout <= 0:
     281              raise ValueError(
     282                  f"ssl_handshake_timeout should be a positive number, "
     283                  f"got {ssl_handshake_timeout}")
     284          if ssl_shutdown_timeout is None:
     285              ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT
     286          elif ssl_shutdown_timeout <= 0:
     287              raise ValueError(
     288                  f"ssl_shutdown_timeout should be a positive number, "
     289                  f"got {ssl_shutdown_timeout}")
     290  
     291          if not sslcontext:
     292              sslcontext = _create_transport_context(
     293                  server_side, server_hostname)
     294  
     295          self._server_side = server_side
     296          if server_hostname and not server_side:
     297              self._server_hostname = server_hostname
     298          else:
     299              self._server_hostname = None
     300          self._sslcontext = sslcontext
     301          # SSL-specific extra info. More info are set when the handshake
     302          # completes.
     303          self._extra = dict(sslcontext=sslcontext)
     304  
     305          # App data write buffering
     306          self._write_backlog = collections.deque()
     307          self._write_buffer_size = 0
     308  
     309          self._waiter = waiter
     310          self._loop = loop
     311          self._set_app_protocol(app_protocol)
     312          self._app_transport = None
     313          self._app_transport_created = False
     314          # transport, ex: SelectorSocketTransport
     315          self._transport = None
     316          self._ssl_handshake_timeout = ssl_handshake_timeout
     317          self._ssl_shutdown_timeout = ssl_shutdown_timeout
     318          # SSL and state machine
     319          self._incoming = ssl.MemoryBIO()
     320          self._outgoing = ssl.MemoryBIO()
     321          self._state = SSLProtocolState.UNWRAPPED
     322          self._conn_lost = 0  # Set when connection_lost called
     323          if call_connection_made:
     324              self._app_state = AppProtocolState.STATE_INIT
     325          else:
     326              self._app_state = AppProtocolState.STATE_CON_MADE
     327          self._sslobj = self._sslcontext.wrap_bio(
     328              self._incoming, self._outgoing,
     329              server_side=self._server_side,
     330              server_hostname=self._server_hostname)
     331  
     332          # Flow Control
     333  
     334          self._ssl_writing_paused = False
     335  
     336          self._app_reading_paused = False
     337  
     338          self._ssl_reading_paused = False
     339          self._incoming_high_water = 0
     340          self._incoming_low_water = 0
     341          self._set_read_buffer_limits()
     342          self._eof_received = False
     343  
     344          self._app_writing_paused = False
     345          self._outgoing_high_water = 0
     346          self._outgoing_low_water = 0
     347          self._set_write_buffer_limits()
     348          self._get_app_transport()
     349  
     350      def _set_app_protocol(self, app_protocol):
     351          self._app_protocol = app_protocol
     352          # Make fast hasattr check first
     353          if (hasattr(app_protocol, 'get_buffer') and
     354                  isinstance(app_protocol, protocols.BufferedProtocol)):
     355              self._app_protocol_get_buffer = app_protocol.get_buffer
     356              self._app_protocol_buffer_updated = app_protocol.buffer_updated
     357              self._app_protocol_is_buffer = True
     358          else:
     359              self._app_protocol_is_buffer = False
     360  
     361      def _wakeup_waiter(self, exc=None):
     362          if self._waiter is None:
     363              return
     364          if not self._waiter.cancelled():
     365              if exc is not None:
     366                  self._waiter.set_exception(exc)
     367              else:
     368                  self._waiter.set_result(None)
     369          self._waiter = None
     370  
     371      def _get_app_transport(self):
     372          if self._app_transport is None:
     373              if self._app_transport_created:
     374                  raise RuntimeError('Creating _SSLProtocolTransport twice')
     375              self._app_transport = _SSLProtocolTransport(self._loop, self)
     376              self._app_transport_created = True
     377          return self._app_transport
     378  
     379      def connection_made(self, transport):
     380          """Called when the low-level connection is made.
     381  
     382          Start the SSL handshake.
     383          """
     384          self._transport = transport
     385          self._start_handshake()
     386  
     387      def connection_lost(self, exc):
     388          """Called when the low-level connection is lost or closed.
     389  
     390          The argument is an exception object or None (the latter
     391          meaning a regular EOF is received or the connection was
     392          aborted or closed).
     393          """
     394          self._write_backlog.clear()
     395          self._outgoing.read()
     396          self._conn_lost += 1
     397  
     398          # Just mark the app transport as closed so that its __dealloc__
     399          # doesn't complain.
     400          if self._app_transport is not None:
     401              self._app_transport._closed = True
     402  
     403          if self._state != SSLProtocolState.DO_HANDSHAKE:
     404              if (
     405                  self._app_state == AppProtocolState.STATE_CON_MADE or
     406                  self._app_state == AppProtocolState.STATE_EOF
     407              ):
     408                  self._app_state = AppProtocolState.STATE_CON_LOST
     409                  self._loop.call_soon(self._app_protocol.connection_lost, exc)
     410          self._set_state(SSLProtocolState.UNWRAPPED)
     411          self._transport = None
     412          self._app_transport = None
     413          self._app_protocol = None
     414          self._wakeup_waiter(exc)
     415  
     416          if self._shutdown_timeout_handle:
     417              self._shutdown_timeout_handle.cancel()
     418              self._shutdown_timeout_handle = None
     419          if self._handshake_timeout_handle:
     420              self._handshake_timeout_handle.cancel()
     421              self._handshake_timeout_handle = None
     422  
     423      def get_buffer(self, n):
     424          want = n
     425          if want <= 0 or want > self.max_size:
     426              want = self.max_size
     427          if len(self._ssl_buffer) < want:
     428              self._ssl_buffer = bytearray(want)
     429              self._ssl_buffer_view = memoryview(self._ssl_buffer)
     430          return self._ssl_buffer_view
     431  
     432      def buffer_updated(self, nbytes):
     433          self._incoming.write(self._ssl_buffer_view[:nbytes])
     434  
     435          if self._state == SSLProtocolState.DO_HANDSHAKE:
     436              self._do_handshake()
     437  
     438          elif self._state == SSLProtocolState.WRAPPED:
     439              self._do_read()
     440  
     441          elif self._state == SSLProtocolState.FLUSHING:
     442              self._do_flush()
     443  
     444          elif self._state == SSLProtocolState.SHUTDOWN:
     445              self._do_shutdown()
     446  
     447      def eof_received(self):
     448          """Called when the other end of the low-level stream
     449          is half-closed.
     450  
     451          If this returns a false value (including None), the transport
     452          will close itself.  If it returns a true value, closing the
     453          transport is up to the protocol.
     454          """
     455          self._eof_received = True
     456          try:
     457              if self._loop.get_debug():
     458                  logger.debug("%r received EOF", self)
     459  
     460              if self._state == SSLProtocolState.DO_HANDSHAKE:
     461                  self._on_handshake_complete(ConnectionResetError)
     462  
     463              elif self._state == SSLProtocolState.WRAPPED:
     464                  self._set_state(SSLProtocolState.FLUSHING)
     465                  if self._app_reading_paused:
     466                      return True
     467                  else:
     468                      self._do_flush()
     469  
     470              elif self._state == SSLProtocolState.FLUSHING:
     471                  self._do_write()
     472                  self._set_state(SSLProtocolState.SHUTDOWN)
     473                  self._do_shutdown()
     474  
     475              elif self._state == SSLProtocolState.SHUTDOWN:
     476                  self._do_shutdown()
     477  
     478          except Exception:
     479              self._transport.close()
     480              raise
     481  
     482      def _get_extra_info(self, name, default=None):
     483          if name in self._extra:
     484              return self._extra[name]
     485          elif self._transport is not None:
     486              return self._transport.get_extra_info(name, default)
     487          else:
     488              return default
     489  
     490      def _set_state(self, new_state):
     491          allowed = False
     492  
     493          if new_state == SSLProtocolState.UNWRAPPED:
     494              allowed = True
     495  
     496          elif (
     497              self._state == SSLProtocolState.UNWRAPPED and
     498              new_state == SSLProtocolState.DO_HANDSHAKE
     499          ):
     500              allowed = True
     501  
     502          elif (
     503              self._state == SSLProtocolState.DO_HANDSHAKE and
     504              new_state == SSLProtocolState.WRAPPED
     505          ):
     506              allowed = True
     507  
     508          elif (
     509              self._state == SSLProtocolState.WRAPPED and
     510              new_state == SSLProtocolState.FLUSHING
     511          ):
     512              allowed = True
     513  
     514          elif (
     515              self._state == SSLProtocolState.FLUSHING and
     516              new_state == SSLProtocolState.SHUTDOWN
     517          ):
     518              allowed = True
     519  
     520          if allowed:
     521              self._state = new_state
     522  
     523          else:
     524              raise RuntimeError(
     525                  'cannot switch state from {} to {}'.format(
     526                      self._state, new_state))
     527  
     528      # Handshake flow
     529  
     530      def _start_handshake(self):
     531          if self._loop.get_debug():
     532              logger.debug("%r starts SSL handshake", self)
     533              self._handshake_start_time = self._loop.time()
     534          else:
     535              self._handshake_start_time = None
     536  
     537          self._set_state(SSLProtocolState.DO_HANDSHAKE)
     538  
     539          # start handshake timeout count down
     540          self._handshake_timeout_handle = \
     541              self._loop.call_later(self._ssl_handshake_timeout,
     542                                    lambda: self._check_handshake_timeout())
     543  
     544          self._do_handshake()
     545  
     546      def _check_handshake_timeout(self):
     547          if self._state == SSLProtocolState.DO_HANDSHAKE:
     548              msg = (
     549                  f"SSL handshake is taking longer than "
     550                  f"{self._ssl_handshake_timeout} seconds: "
     551                  f"aborting the connection"
     552              )
     553              self._fatal_error(ConnectionAbortedError(msg))
     554  
     555      def _do_handshake(self):
     556          try:
     557              self._sslobj.do_handshake()
     558          except SSLAgainErrors:
     559              self._process_outgoing()
     560          except ssl.SSLError as exc:
     561              self._on_handshake_complete(exc)
     562          else:
     563              self._on_handshake_complete(None)
     564  
     565      def _on_handshake_complete(self, handshake_exc):
     566          if self._handshake_timeout_handle is not None:
     567              self._handshake_timeout_handle.cancel()
     568              self._handshake_timeout_handle = None
     569  
     570          sslobj = self._sslobj
     571          try:
     572              if handshake_exc is None:
     573                  self._set_state(SSLProtocolState.WRAPPED)
     574              else:
     575                  raise handshake_exc
     576  
     577              peercert = sslobj.getpeercert()
     578          except Exception as exc:
     579              self._set_state(SSLProtocolState.UNWRAPPED)
     580              if isinstance(exc, ssl.CertificateError):
     581                  msg = 'SSL handshake failed on verifying the certificate'
     582              else:
     583                  msg = 'SSL handshake failed'
     584              self._fatal_error(exc, msg)
     585              self._wakeup_waiter(exc)
     586              return
     587  
     588          if self._loop.get_debug():
     589              dt = self._loop.time() - self._handshake_start_time
     590              logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
     591  
     592          # Add extra info that becomes available after handshake.
     593          self._extra.update(peercert=peercert,
     594                             cipher=sslobj.cipher(),
     595                             compression=sslobj.compression(),
     596                             ssl_object=sslobj)
     597          if self._app_state == AppProtocolState.STATE_INIT:
     598              self._app_state = AppProtocolState.STATE_CON_MADE
     599              self._app_protocol.connection_made(self._get_app_transport())
     600          self._wakeup_waiter()
     601          self._do_read()
     602  
     603      # Shutdown flow
     604  
     605      def _start_shutdown(self):
     606          if (
     607              self._state in (
     608                  SSLProtocolState.FLUSHING,
     609                  SSLProtocolState.SHUTDOWN,
     610                  SSLProtocolState.UNWRAPPED
     611              )
     612          ):
     613              return
     614          if self._app_transport is not None:
     615              self._app_transport._closed = True
     616          if self._state == SSLProtocolState.DO_HANDSHAKE:
     617              self._abort()
     618          else:
     619              self._set_state(SSLProtocolState.FLUSHING)
     620              self._shutdown_timeout_handle = self._loop.call_later(
     621                  self._ssl_shutdown_timeout,
     622                  lambda: self._check_shutdown_timeout()
     623              )
     624              self._do_flush()
     625  
     626      def _check_shutdown_timeout(self):
     627          if (
     628              self._state in (
     629                  SSLProtocolState.FLUSHING,
     630                  SSLProtocolState.SHUTDOWN
     631              )
     632          ):
     633              self._transport._force_close(
     634                  exceptions.TimeoutError('SSL shutdown timed out'))
     635  
     636      def _do_flush(self):
     637          self._do_read()
     638          self._set_state(SSLProtocolState.SHUTDOWN)
     639          self._do_shutdown()
     640  
     641      def _do_shutdown(self):
     642          try:
     643              if not self._eof_received:
     644                  self._sslobj.unwrap()
     645          except SSLAgainErrors:
     646              self._process_outgoing()
     647          except ssl.SSLError as exc:
     648              self._on_shutdown_complete(exc)
     649          else:
     650              self._process_outgoing()
     651              self._call_eof_received()
     652              self._on_shutdown_complete(None)
     653  
     654      def _on_shutdown_complete(self, shutdown_exc):
     655          if self._shutdown_timeout_handle is not None:
     656              self._shutdown_timeout_handle.cancel()
     657              self._shutdown_timeout_handle = None
     658  
     659          if shutdown_exc:
     660              self._fatal_error(shutdown_exc)
     661          else:
     662              self._loop.call_soon(self._transport.close)
     663  
     664      def _abort(self):
     665          self._set_state(SSLProtocolState.UNWRAPPED)
     666          if self._transport is not None:
     667              self._transport.abort()
     668  
     669      # Outgoing flow
     670  
     671      def _write_appdata(self, list_of_data):
     672          if (
     673              self._state in (
     674                  SSLProtocolState.FLUSHING,
     675                  SSLProtocolState.SHUTDOWN,
     676                  SSLProtocolState.UNWRAPPED
     677              )
     678          ):
     679              if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
     680                  logger.warning('SSL connection is closed')
     681              self._conn_lost += 1
     682              return
     683  
     684          for data in list_of_data:
     685              self._write_backlog.append(data)
     686              self._write_buffer_size += len(data)
     687  
     688          try:
     689              if self._state == SSLProtocolState.WRAPPED:
     690                  self._do_write()
     691  
     692          except Exception as ex:
     693              self._fatal_error(ex, 'Fatal error on SSL protocol')
     694  
     695      def _do_write(self):
     696          try:
     697              while self._write_backlog:
     698                  data = self._write_backlog[0]
     699                  count = self._sslobj.write(data)
     700                  data_len = len(data)
     701                  if count < data_len:
     702                      self._write_backlog[0] = data[count:]
     703                      self._write_buffer_size -= count
     704                  else:
     705                      del self._write_backlog[0]
     706                      self._write_buffer_size -= data_len
     707          except SSLAgainErrors:
     708              pass
     709          self._process_outgoing()
     710  
     711      def _process_outgoing(self):
     712          if not self._ssl_writing_paused:
     713              data = self._outgoing.read()
     714              if len(data):
     715                  self._transport.write(data)
     716          self._control_app_writing()
     717  
     718      # Incoming flow
     719  
     720      def _do_read(self):
     721          if (
     722              self._state not in (
     723                  SSLProtocolState.WRAPPED,
     724                  SSLProtocolState.FLUSHING,
     725              )
     726          ):
     727              return
     728          try:
     729              if not self._app_reading_paused:
     730                  if self._app_protocol_is_buffer:
     731                      self._do_read__buffered()
     732                  else:
     733                      self._do_read__copied()
     734                  if self._write_backlog:
     735                      self._do_write()
     736                  else:
     737                      self._process_outgoing()
     738              self._control_ssl_reading()
     739          except Exception as ex:
     740              self._fatal_error(ex, 'Fatal error on SSL protocol')
     741  
     742      def _do_read__buffered(self):
     743          offset = 0
     744          count = 1
     745  
     746          buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
     747          wants = len(buf)
     748  
     749          try:
     750              count = self._sslobj.read(wants, buf)
     751  
     752              if count > 0:
     753                  offset = count
     754                  while offset < wants:
     755                      count = self._sslobj.read(wants - offset, buf[offset:])
     756                      if count > 0:
     757                          offset += count
     758                      else:
     759                          break
     760                  else:
     761                      self._loop.call_soon(lambda: self._do_read())
     762          except SSLAgainErrors:
     763              pass
     764          if offset > 0:
     765              self._app_protocol_buffer_updated(offset)
     766          if not count:
     767              # close_notify
     768              self._call_eof_received()
     769              self._start_shutdown()
     770  
     771      def _do_read__copied(self):
     772          chunk = b'1'
     773          zero = True
     774          one = False
     775  
     776          try:
     777              while True:
     778                  chunk = self._sslobj.read(self.max_size)
     779                  if not chunk:
     780                      break
     781                  if zero:
     782                      zero = False
     783                      one = True
     784                      first = chunk
     785                  elif one:
     786                      one = False
     787                      data = [first, chunk]
     788                  else:
     789                      data.append(chunk)
     790          except SSLAgainErrors:
     791              pass
     792          if one:
     793              self._app_protocol.data_received(first)
     794          elif not zero:
     795              self._app_protocol.data_received(b''.join(data))
     796          if not chunk:
     797              # close_notify
     798              self._call_eof_received()
     799              self._start_shutdown()
     800  
     801      def _call_eof_received(self):
     802          try:
     803              if self._app_state == AppProtocolState.STATE_CON_MADE:
     804                  self._app_state = AppProtocolState.STATE_EOF
     805                  keep_open = self._app_protocol.eof_received()
     806                  if keep_open:
     807                      logger.warning('returning true from eof_received() '
     808                                     'has no effect when using ssl')
     809          except (KeyboardInterrupt, SystemExit):
     810              raise
     811          except BaseException as ex:
     812              self._fatal_error(ex, 'Error calling eof_received()')
     813  
     814      # Flow control for writes from APP socket
     815  
     816      def _control_app_writing(self):
     817          size = self._get_write_buffer_size()
     818          if size >= self._outgoing_high_water and not self._app_writing_paused:
     819              self._app_writing_paused = True
     820              try:
     821                  self._app_protocol.pause_writing()
     822              except (KeyboardInterrupt, SystemExit):
     823                  raise
     824              except BaseException as exc:
     825                  self._loop.call_exception_handler({
     826                      'message': 'protocol.pause_writing() failed',
     827                      'exception': exc,
     828                      'transport': self._app_transport,
     829                      'protocol': self,
     830                  })
     831          elif size <= self._outgoing_low_water and self._app_writing_paused:
     832              self._app_writing_paused = False
     833              try:
     834                  self._app_protocol.resume_writing()
     835              except (KeyboardInterrupt, SystemExit):
     836                  raise
     837              except BaseException as exc:
     838                  self._loop.call_exception_handler({
     839                      'message': 'protocol.resume_writing() failed',
     840                      'exception': exc,
     841                      'transport': self._app_transport,
     842                      'protocol': self,
     843                  })
     844  
     845      def _get_write_buffer_size(self):
     846          return self._outgoing.pending + self._write_buffer_size
     847  
     848      def _set_write_buffer_limits(self, high=None, low=None):
     849          high, low = add_flowcontrol_defaults(
     850              high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
     851          self._outgoing_high_water = high
     852          self._outgoing_low_water = low
     853  
     854      # Flow control for reads to APP socket
     855  
     856      def _pause_reading(self):
     857          self._app_reading_paused = True
     858  
     859      def _resume_reading(self):
     860          if self._app_reading_paused:
     861              self._app_reading_paused = False
     862  
     863              def resume():
     864                  if self._state == SSLProtocolState.WRAPPED:
     865                      self._do_read()
     866                  elif self._state == SSLProtocolState.FLUSHING:
     867                      self._do_flush()
     868                  elif self._state == SSLProtocolState.SHUTDOWN:
     869                      self._do_shutdown()
     870              self._loop.call_soon(resume)
     871  
     872      # Flow control for reads from SSL socket
     873  
     874      def _control_ssl_reading(self):
     875          size = self._get_read_buffer_size()
     876          if size >= self._incoming_high_water and not self._ssl_reading_paused:
     877              self._ssl_reading_paused = True
     878              self._transport.pause_reading()
     879          elif size <= self._incoming_low_water and self._ssl_reading_paused:
     880              self._ssl_reading_paused = False
     881              self._transport.resume_reading()
     882  
     883      def _set_read_buffer_limits(self, high=None, low=None):
     884          high, low = add_flowcontrol_defaults(
     885              high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ)
     886          self._incoming_high_water = high
     887          self._incoming_low_water = low
     888  
     889      def _get_read_buffer_size(self):
     890          return self._incoming.pending
     891  
     892      # Flow control for writes to SSL socket
     893  
     894      def pause_writing(self):
     895          """Called when the low-level transport's buffer goes over
     896          the high-water mark.
     897          """
     898          assert not self._ssl_writing_paused
     899          self._ssl_writing_paused = True
     900  
     901      def resume_writing(self):
     902          """Called when the low-level transport's buffer drains below
     903          the low-water mark.
     904          """
     905          assert self._ssl_writing_paused
     906          self._ssl_writing_paused = False
     907          self._process_outgoing()
     908  
     909      def _fatal_error(self, exc, message='Fatal error on transport'):
     910          if self._transport:
     911              self._transport._force_close(exc)
     912  
     913          if isinstance(exc, OSError):
     914              if self._loop.get_debug():
     915                  logger.debug("%r: %s", self, message, exc_info=True)
     916          elif not isinstance(exc, exceptions.CancelledError):
     917              self._loop.call_exception_handler({
     918                  'message': message,
     919                  'exception': exc,
     920                  'transport': self._transport,
     921                  'protocol': self,
     922              })