python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_selector_events.py
       1  """Tests for selector_events.py"""
       2  
       3  import collections
       4  import selectors
       5  import socket
       6  import sys
       7  import unittest
       8  from asyncio import selector_events
       9  from unittest import mock
      10  
      11  try:
      12      import ssl
      13  except ImportError:
      14      ssl = None
      15  
      16  import asyncio
      17  from asyncio.selector_events import (BaseSelectorEventLoop,
      18                                       _SelectorDatagramTransport,
      19                                       _SelectorSocketTransport,
      20                                       _SelectorTransport)
      21  from test.test_asyncio import utils as test_utils
      22  
      23  MOCK_ANY = mock.ANY
      24  
      25  
      26  def tearDownModule():
      27      asyncio.set_event_loop_policy(None)
      28  
      29  
      30  class ESC[4;38;5;81mTestBaseSelectorEventLoop(ESC[4;38;5;149mBaseSelectorEventLoop):
      31  
      32      def _make_self_pipe(self):
      33          self._ssock = mock.Mock()
      34          self._csock = mock.Mock()
      35          self._internal_fds += 1
      36  
      37      def _close_self_pipe(self):
      38          pass
      39  
      40  
      41  def list_to_buffer(l=()):
      42      buffer = collections.deque()
      43      buffer.extend((memoryview(i) for i in l))
      44      return buffer
      45  
      46  
      47  
      48  def close_transport(transport):
      49      # Don't call transport.close() because the event loop and the selector
      50      # are mocked
      51      if transport._sock is None:
      52          return
      53      transport._sock.close()
      54      transport._sock = None
      55  
      56  
      57  class ESC[4;38;5;81mBaseSelectorEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      58  
      59      def setUp(self):
      60          super().setUp()
      61          self.selector = mock.Mock()
      62          self.selector.select.return_value = []
      63          self.loop = TestBaseSelectorEventLoop(self.selector)
      64          self.set_event_loop(self.loop)
      65  
      66      def test_make_socket_transport(self):
      67          m = mock.Mock()
      68          self.loop.add_reader = mock.Mock()
      69          self.loop._ensure_fd_no_transport = mock.Mock()
      70          transport = self.loop._make_socket_transport(m, asyncio.Protocol())
      71          self.assertIsInstance(transport, _SelectorSocketTransport)
      72          self.assertEqual(self.loop._ensure_fd_no_transport.call_count, 1)
      73  
      74          # Calling repr() must not fail when the event loop is closed
      75          self.loop.close()
      76          repr(transport)
      77  
      78          close_transport(transport)
      79  
      80      @mock.patch('asyncio.selector_events.ssl', None)
      81      @mock.patch('asyncio.sslproto.ssl', None)
      82      def test_make_ssl_transport_without_ssl_error(self):
      83          m = mock.Mock()
      84          self.loop.add_reader = mock.Mock()
      85          self.loop.add_writer = mock.Mock()
      86          self.loop.remove_reader = mock.Mock()
      87          self.loop.remove_writer = mock.Mock()
      88          self.loop._ensure_fd_no_transport = mock.Mock()
      89          with self.assertRaises(RuntimeError):
      90              self.loop._make_ssl_transport(m, m, m, m)
      91          self.assertEqual(self.loop._ensure_fd_no_transport.call_count, 1)
      92  
      93      def test_close(self):
      94          class ESC[4;38;5;81mEventLoop(ESC[4;38;5;149mBaseSelectorEventLoop):
      95              def _make_self_pipe(self):
      96                  self._ssock = mock.Mock()
      97                  self._csock = mock.Mock()
      98                  self._internal_fds += 1
      99  
     100          self.loop = EventLoop(self.selector)
     101          self.set_event_loop(self.loop)
     102  
     103          ssock = self.loop._ssock
     104          ssock.fileno.return_value = 7
     105          csock = self.loop._csock
     106          csock.fileno.return_value = 1
     107          remove_reader = self.loop._remove_reader = mock.Mock()
     108  
     109          self.loop._selector.close()
     110          self.loop._selector = selector = mock.Mock()
     111          self.assertFalse(self.loop.is_closed())
     112  
     113          self.loop.close()
     114          self.assertTrue(self.loop.is_closed())
     115          self.assertIsNone(self.loop._selector)
     116          self.assertIsNone(self.loop._csock)
     117          self.assertIsNone(self.loop._ssock)
     118          selector.close.assert_called_with()
     119          ssock.close.assert_called_with()
     120          csock.close.assert_called_with()
     121          remove_reader.assert_called_with(7)
     122  
     123          # it should be possible to call close() more than once
     124          self.loop.close()
     125          self.loop.close()
     126  
     127          # operation blocked when the loop is closed
     128          f = self.loop.create_future()
     129          self.assertRaises(RuntimeError, self.loop.run_forever)
     130          self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
     131          fd = 0
     132          def callback():
     133              pass
     134          self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
     135          self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
     136  
     137      def test_close_no_selector(self):
     138          self.loop.remove_reader = mock.Mock()
     139          self.loop._selector.close()
     140          self.loop._selector = None
     141          self.loop.close()
     142          self.assertIsNone(self.loop._selector)
     143  
     144      def test_read_from_self_tryagain(self):
     145          self.loop._ssock.recv.side_effect = BlockingIOError
     146          self.assertIsNone(self.loop._read_from_self())
     147  
     148      def test_read_from_self_exception(self):
     149          self.loop._ssock.recv.side_effect = OSError
     150          self.assertRaises(OSError, self.loop._read_from_self)
     151  
     152      def test_write_to_self_tryagain(self):
     153          self.loop._csock.send.side_effect = BlockingIOError
     154          with test_utils.disable_logger():
     155              self.assertIsNone(self.loop._write_to_self())
     156  
     157      def test_write_to_self_exception(self):
     158          # _write_to_self() swallows OSError
     159          self.loop._csock.send.side_effect = RuntimeError()
     160          self.assertRaises(RuntimeError, self.loop._write_to_self)
     161  
     162      @mock.patch('socket.getaddrinfo')
     163      def test_sock_connect_resolve_using_socket_params(self, m_gai):
     164          addr = ('need-resolution.com', 8080)
     165          for sock_type in [socket.SOCK_STREAM, socket.SOCK_DGRAM]:
     166              with self.subTest(sock_type):
     167                  sock = test_utils.mock_nonblocking_socket(type=sock_type)
     168  
     169                  m_gai.side_effect = \
     170                      lambda *args: [(None, None, None, None, ('127.0.0.1', 0))]
     171  
     172                  con = self.loop.create_task(self.loop.sock_connect(sock, addr))
     173                  self.loop.run_until_complete(con)
     174                  m_gai.assert_called_with(
     175                      addr[0], addr[1], sock.family, sock.type, sock.proto, 0)
     176  
     177                  self.loop.run_until_complete(con)
     178                  sock.connect.assert_called_with(('127.0.0.1', 0))
     179  
     180      def test_add_reader(self):
     181          self.loop._selector.get_key.side_effect = KeyError
     182          cb = lambda: True
     183          self.loop.add_reader(1, cb)
     184  
     185          self.assertTrue(self.loop._selector.register.called)
     186          fd, mask, (r, w) = self.loop._selector.register.call_args[0]
     187          self.assertEqual(1, fd)
     188          self.assertEqual(selectors.EVENT_READ, mask)
     189          self.assertEqual(cb, r._callback)
     190          self.assertIsNone(w)
     191  
     192      def test_add_reader_existing(self):
     193          reader = mock.Mock()
     194          writer = mock.Mock()
     195          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     196              1, 1, selectors.EVENT_WRITE, (reader, writer))
     197          cb = lambda: True
     198          self.loop.add_reader(1, cb)
     199  
     200          self.assertTrue(reader.cancel.called)
     201          self.assertFalse(self.loop._selector.register.called)
     202          self.assertTrue(self.loop._selector.modify.called)
     203          fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
     204          self.assertEqual(1, fd)
     205          self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
     206          self.assertEqual(cb, r._callback)
     207          self.assertEqual(writer, w)
     208  
     209      def test_add_reader_existing_writer(self):
     210          writer = mock.Mock()
     211          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     212              1, 1, selectors.EVENT_WRITE, (None, writer))
     213          cb = lambda: True
     214          self.loop.add_reader(1, cb)
     215  
     216          self.assertFalse(self.loop._selector.register.called)
     217          self.assertTrue(self.loop._selector.modify.called)
     218          fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
     219          self.assertEqual(1, fd)
     220          self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
     221          self.assertEqual(cb, r._callback)
     222          self.assertEqual(writer, w)
     223  
     224      def test_remove_reader(self):
     225          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     226              1, 1, selectors.EVENT_READ, (None, None))
     227          self.assertFalse(self.loop.remove_reader(1))
     228  
     229          self.assertTrue(self.loop._selector.unregister.called)
     230  
     231      def test_remove_reader_read_write(self):
     232          reader = mock.Mock()
     233          writer = mock.Mock()
     234          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     235              1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
     236              (reader, writer))
     237          self.assertTrue(
     238              self.loop.remove_reader(1))
     239  
     240          self.assertFalse(self.loop._selector.unregister.called)
     241          self.assertEqual(
     242              (1, selectors.EVENT_WRITE, (None, writer)),
     243              self.loop._selector.modify.call_args[0])
     244  
     245      def test_remove_reader_unknown(self):
     246          self.loop._selector.get_key.side_effect = KeyError
     247          self.assertFalse(
     248              self.loop.remove_reader(1))
     249  
     250      def test_add_writer(self):
     251          self.loop._selector.get_key.side_effect = KeyError
     252          cb = lambda: True
     253          self.loop.add_writer(1, cb)
     254  
     255          self.assertTrue(self.loop._selector.register.called)
     256          fd, mask, (r, w) = self.loop._selector.register.call_args[0]
     257          self.assertEqual(1, fd)
     258          self.assertEqual(selectors.EVENT_WRITE, mask)
     259          self.assertIsNone(r)
     260          self.assertEqual(cb, w._callback)
     261  
     262      def test_add_writer_existing(self):
     263          reader = mock.Mock()
     264          writer = mock.Mock()
     265          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     266              1, 1, selectors.EVENT_READ, (reader, writer))
     267          cb = lambda: True
     268          self.loop.add_writer(1, cb)
     269  
     270          self.assertTrue(writer.cancel.called)
     271          self.assertFalse(self.loop._selector.register.called)
     272          self.assertTrue(self.loop._selector.modify.called)
     273          fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
     274          self.assertEqual(1, fd)
     275          self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
     276          self.assertEqual(reader, r)
     277          self.assertEqual(cb, w._callback)
     278  
     279      def test_remove_writer(self):
     280          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     281              1, 1, selectors.EVENT_WRITE, (None, None))
     282          self.assertFalse(self.loop.remove_writer(1))
     283  
     284          self.assertTrue(self.loop._selector.unregister.called)
     285  
     286      def test_remove_writer_read_write(self):
     287          reader = mock.Mock()
     288          writer = mock.Mock()
     289          self.loop._selector.get_key.return_value = selectors.SelectorKey(
     290              1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
     291              (reader, writer))
     292          self.assertTrue(
     293              self.loop.remove_writer(1))
     294  
     295          self.assertFalse(self.loop._selector.unregister.called)
     296          self.assertEqual(
     297              (1, selectors.EVENT_READ, (reader, None)),
     298              self.loop._selector.modify.call_args[0])
     299  
     300      def test_remove_writer_unknown(self):
     301          self.loop._selector.get_key.side_effect = KeyError
     302          self.assertFalse(
     303              self.loop.remove_writer(1))
     304  
     305      def test_process_events_read(self):
     306          reader = mock.Mock()
     307          reader._cancelled = False
     308  
     309          self.loop._add_callback = mock.Mock()
     310          self.loop._process_events(
     311              [(selectors.SelectorKey(
     312                  1, 1, selectors.EVENT_READ, (reader, None)),
     313                selectors.EVENT_READ)])
     314          self.assertTrue(self.loop._add_callback.called)
     315          self.loop._add_callback.assert_called_with(reader)
     316  
     317      def test_process_events_read_cancelled(self):
     318          reader = mock.Mock()
     319          reader.cancelled = True
     320  
     321          self.loop._remove_reader = mock.Mock()
     322          self.loop._process_events(
     323              [(selectors.SelectorKey(
     324                  1, 1, selectors.EVENT_READ, (reader, None)),
     325               selectors.EVENT_READ)])
     326          self.loop._remove_reader.assert_called_with(1)
     327  
     328      def test_process_events_write(self):
     329          writer = mock.Mock()
     330          writer._cancelled = False
     331  
     332          self.loop._add_callback = mock.Mock()
     333          self.loop._process_events(
     334              [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
     335                                      (None, writer)),
     336                selectors.EVENT_WRITE)])
     337          self.loop._add_callback.assert_called_with(writer)
     338  
     339      def test_process_events_write_cancelled(self):
     340          writer = mock.Mock()
     341          writer.cancelled = True
     342          self.loop._remove_writer = mock.Mock()
     343  
     344          self.loop._process_events(
     345              [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
     346                                      (None, writer)),
     347                selectors.EVENT_WRITE)])
     348          self.loop._remove_writer.assert_called_with(1)
     349  
     350      def test_accept_connection_multiple(self):
     351          sock = mock.Mock()
     352          sock.accept.return_value = (mock.Mock(), mock.Mock())
     353          backlog = 100
     354          # Mock the coroutine generation for a connection to prevent
     355          # warnings related to un-awaited coroutines. _accept_connection2
     356          # is an async function that is patched with AsyncMock. create_task
     357          # creates a task out of coroutine returned by AsyncMock, so use
     358          # asyncio.sleep(0) to ensure created tasks are complete to avoid
     359          # task pending warnings.
     360          mock_obj = mock.patch.object
     361          with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
     362              self.loop._accept_connection(
     363                  mock.Mock(), sock, backlog=backlog)
     364          self.loop.run_until_complete(asyncio.sleep(0))
     365          self.assertEqual(sock.accept.call_count, backlog)
     366  
     367  
     368  class ESC[4;38;5;81mSelectorTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     369  
     370      def setUp(self):
     371          super().setUp()
     372          self.loop = self.new_test_loop()
     373          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
     374          self.sock = mock.Mock(socket.socket)
     375          self.sock.fileno.return_value = 7
     376  
     377      def create_transport(self):
     378          transport = _SelectorTransport(self.loop, self.sock, self.protocol,
     379                                         None)
     380          self.addCleanup(close_transport, transport)
     381          return transport
     382  
     383      def test_ctor(self):
     384          tr = self.create_transport()
     385          self.assertIs(tr._loop, self.loop)
     386          self.assertIs(tr._sock, self.sock)
     387          self.assertIs(tr._sock_fd, 7)
     388  
     389      def test_abort(self):
     390          tr = self.create_transport()
     391          tr._force_close = mock.Mock()
     392  
     393          tr.abort()
     394          tr._force_close.assert_called_with(None)
     395  
     396      def test_close(self):
     397          tr = self.create_transport()
     398          tr.close()
     399  
     400          self.assertTrue(tr.is_closing())
     401          self.assertEqual(1, self.loop.remove_reader_count[7])
     402          self.protocol.connection_lost(None)
     403          self.assertEqual(tr._conn_lost, 1)
     404  
     405          tr.close()
     406          self.assertEqual(tr._conn_lost, 1)
     407          self.assertEqual(1, self.loop.remove_reader_count[7])
     408  
     409      def test_close_write_buffer(self):
     410          tr = self.create_transport()
     411          tr._buffer.extend(b'data')
     412          tr.close()
     413  
     414          self.assertFalse(self.loop.readers)
     415          test_utils.run_briefly(self.loop)
     416          self.assertFalse(self.protocol.connection_lost.called)
     417  
     418      def test_force_close(self):
     419          tr = self.create_transport()
     420          tr._buffer.extend(b'1')
     421          self.loop._add_reader(7, mock.sentinel)
     422          self.loop._add_writer(7, mock.sentinel)
     423          tr._force_close(None)
     424  
     425          self.assertTrue(tr.is_closing())
     426          self.assertEqual(tr._buffer, list_to_buffer())
     427          self.assertFalse(self.loop.readers)
     428          self.assertFalse(self.loop.writers)
     429  
     430          # second close should not remove reader
     431          tr._force_close(None)
     432          self.assertFalse(self.loop.readers)
     433          self.assertEqual(1, self.loop.remove_reader_count[7])
     434  
     435      @mock.patch('asyncio.log.logger.error')
     436      def test_fatal_error(self, m_exc):
     437          exc = OSError()
     438          tr = self.create_transport()
     439          tr._force_close = mock.Mock()
     440          tr._fatal_error(exc)
     441  
     442          m_exc.assert_not_called()
     443  
     444          tr._force_close.assert_called_with(exc)
     445  
     446      @mock.patch('asyncio.log.logger.error')
     447      def test_fatal_error_custom_exception(self, m_exc):
     448          class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
     449              pass
     450          exc = MyError()
     451          tr = self.create_transport()
     452          tr._force_close = mock.Mock()
     453          tr._fatal_error(exc)
     454  
     455          m_exc.assert_called_with(
     456              test_utils.MockPattern(
     457                  'Fatal error on transport\nprotocol:.*\ntransport:.*'),
     458              exc_info=(MyError, MOCK_ANY, MOCK_ANY))
     459  
     460          tr._force_close.assert_called_with(exc)
     461  
     462      def test_connection_lost(self):
     463          exc = OSError()
     464          tr = self.create_transport()
     465          self.assertIsNotNone(tr._protocol)
     466          self.assertIsNotNone(tr._loop)
     467          tr._call_connection_lost(exc)
     468  
     469          self.protocol.connection_lost.assert_called_with(exc)
     470          self.sock.close.assert_called_with()
     471          self.assertIsNone(tr._sock)
     472  
     473          self.assertIsNone(tr._protocol)
     474          self.assertIsNone(tr._loop)
     475  
     476      def test__add_reader(self):
     477          tr = self.create_transport()
     478          tr._buffer.extend(b'1')
     479          tr._add_reader(7, mock.sentinel)
     480          self.assertTrue(self.loop.readers)
     481  
     482          tr._force_close(None)
     483  
     484          self.assertTrue(tr.is_closing())
     485          self.assertFalse(self.loop.readers)
     486  
     487          # can not add readers after closing
     488          tr._add_reader(7, mock.sentinel)
     489          self.assertFalse(self.loop.readers)
     490  
     491  
     492  class ESC[4;38;5;81mSelectorSocketTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     493  
     494      def setUp(self):
     495          super().setUp()
     496          self.loop = self.new_test_loop()
     497          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
     498          self.sock = mock.Mock(socket.socket)
     499          self.sock_fd = self.sock.fileno.return_value = 7
     500  
     501      def socket_transport(self, waiter=None, sendmsg=False):
     502          transport = _SelectorSocketTransport(self.loop, self.sock,
     503                                               self.protocol, waiter=waiter)
     504          if sendmsg:
     505              transport._write_ready = transport._write_sendmsg
     506          else:
     507              transport._write_ready = transport._write_send
     508          self.addCleanup(close_transport, transport)
     509          return transport
     510  
     511      def test_ctor(self):
     512          waiter = self.loop.create_future()
     513          tr = self.socket_transport(waiter=waiter)
     514          self.loop.run_until_complete(waiter)
     515  
     516          self.loop.assert_reader(7, tr._read_ready)
     517          test_utils.run_briefly(self.loop)
     518          self.protocol.connection_made.assert_called_with(tr)
     519  
     520      def test_ctor_with_waiter(self):
     521          waiter = self.loop.create_future()
     522          self.socket_transport(waiter=waiter)
     523          self.loop.run_until_complete(waiter)
     524  
     525          self.assertIsNone(waiter.result())
     526  
     527      def test_pause_resume_reading(self):
     528          tr = self.socket_transport()
     529          test_utils.run_briefly(self.loop)
     530          self.assertFalse(tr._paused)
     531          self.assertTrue(tr.is_reading())
     532          self.loop.assert_reader(7, tr._read_ready)
     533  
     534          tr.pause_reading()
     535          tr.pause_reading()
     536          self.assertTrue(tr._paused)
     537          self.assertFalse(tr.is_reading())
     538          self.loop.assert_no_reader(7)
     539  
     540          tr.resume_reading()
     541          tr.resume_reading()
     542          self.assertFalse(tr._paused)
     543          self.assertTrue(tr.is_reading())
     544          self.loop.assert_reader(7, tr._read_ready)
     545  
     546          tr.close()
     547          self.assertFalse(tr.is_reading())
     548          self.loop.assert_no_reader(7)
     549  
     550      def test_pause_reading_connection_made(self):
     551          tr = self.socket_transport()
     552          self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
     553          test_utils.run_briefly(self.loop)
     554          self.assertFalse(tr.is_reading())
     555          self.loop.assert_no_reader(7)
     556  
     557          tr.resume_reading()
     558          self.assertTrue(tr.is_reading())
     559          self.loop.assert_reader(7, tr._read_ready)
     560  
     561          tr.close()
     562          self.assertFalse(tr.is_reading())
     563          self.loop.assert_no_reader(7)
     564  
     565  
     566      def test_read_eof_received_error(self):
     567          transport = self.socket_transport()
     568          transport.close = mock.Mock()
     569          transport._fatal_error = mock.Mock()
     570  
     571          self.loop.call_exception_handler = mock.Mock()
     572  
     573          self.protocol.eof_received.side_effect = LookupError()
     574  
     575          self.sock.recv.return_value = b''
     576          transport._read_ready()
     577  
     578          self.protocol.eof_received.assert_called_with()
     579          self.assertTrue(transport._fatal_error.called)
     580  
     581      def test_data_received_error(self):
     582          transport = self.socket_transport()
     583          transport._fatal_error = mock.Mock()
     584  
     585          self.loop.call_exception_handler = mock.Mock()
     586          self.protocol.data_received.side_effect = LookupError()
     587  
     588          self.sock.recv.return_value = b'data'
     589          transport._read_ready()
     590  
     591          self.assertTrue(transport._fatal_error.called)
     592          self.assertTrue(self.protocol.data_received.called)
     593  
     594      def test_read_ready(self):
     595          transport = self.socket_transport()
     596  
     597          self.sock.recv.return_value = b'data'
     598          transport._read_ready()
     599  
     600          self.protocol.data_received.assert_called_with(b'data')
     601  
     602      def test_read_ready_eof(self):
     603          transport = self.socket_transport()
     604          transport.close = mock.Mock()
     605  
     606          self.sock.recv.return_value = b''
     607          transport._read_ready()
     608  
     609          self.protocol.eof_received.assert_called_with()
     610          transport.close.assert_called_with()
     611  
     612      def test_read_ready_eof_keep_open(self):
     613          transport = self.socket_transport()
     614          transport.close = mock.Mock()
     615  
     616          self.sock.recv.return_value = b''
     617          self.protocol.eof_received.return_value = True
     618          transport._read_ready()
     619  
     620          self.protocol.eof_received.assert_called_with()
     621          self.assertFalse(transport.close.called)
     622  
     623      @mock.patch('logging.exception')
     624      def test_read_ready_tryagain(self, m_exc):
     625          self.sock.recv.side_effect = BlockingIOError
     626  
     627          transport = self.socket_transport()
     628          transport._fatal_error = mock.Mock()
     629          transport._read_ready()
     630  
     631          self.assertFalse(transport._fatal_error.called)
     632  
     633      @mock.patch('logging.exception')
     634      def test_read_ready_tryagain_interrupted(self, m_exc):
     635          self.sock.recv.side_effect = InterruptedError
     636  
     637          transport = self.socket_transport()
     638          transport._fatal_error = mock.Mock()
     639          transport._read_ready()
     640  
     641          self.assertFalse(transport._fatal_error.called)
     642  
     643      @mock.patch('logging.exception')
     644      def test_read_ready_conn_reset(self, m_exc):
     645          err = self.sock.recv.side_effect = ConnectionResetError()
     646  
     647          transport = self.socket_transport()
     648          transport._force_close = mock.Mock()
     649          with test_utils.disable_logger():
     650              transport._read_ready()
     651          transport._force_close.assert_called_with(err)
     652  
     653      @mock.patch('logging.exception')
     654      def test_read_ready_err(self, m_exc):
     655          err = self.sock.recv.side_effect = OSError()
     656  
     657          transport = self.socket_transport()
     658          transport._fatal_error = mock.Mock()
     659          transport._read_ready()
     660  
     661          transport._fatal_error.assert_called_with(
     662                                     err,
     663                                     'Fatal read error on socket transport')
     664  
     665      def test_write(self):
     666          data = b'data'
     667          self.sock.send.return_value = len(data)
     668  
     669          transport = self.socket_transport()
     670          transport.write(data)
     671          self.sock.send.assert_called_with(data)
     672  
     673      def test_write_bytearray(self):
     674          data = bytearray(b'data')
     675          self.sock.send.return_value = len(data)
     676  
     677          transport = self.socket_transport()
     678          transport.write(data)
     679          self.sock.send.assert_called_with(data)
     680          self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
     681  
     682      def test_write_memoryview(self):
     683          data = memoryview(b'data')
     684          self.sock.send.return_value = len(data)
     685  
     686          transport = self.socket_transport()
     687          transport.write(data)
     688          self.sock.send.assert_called_with(data)
     689  
     690      def test_write_no_data(self):
     691          transport = self.socket_transport()
     692          transport._buffer.append(memoryview(b'data'))
     693          transport.write(b'')
     694          self.assertFalse(self.sock.send.called)
     695          self.assertEqual(list_to_buffer([b'data']), transport._buffer)
     696  
     697      def test_write_buffer(self):
     698          transport = self.socket_transport()
     699          transport._buffer.append(b'data1')
     700          transport.write(b'data2')
     701          self.assertFalse(self.sock.send.called)
     702          self.assertEqual(list_to_buffer([b'data1', b'data2']),
     703                           transport._buffer)
     704  
     705      def test_write_partial(self):
     706          data = b'data'
     707          self.sock.send.return_value = 2
     708  
     709          transport = self.socket_transport()
     710          transport.write(data)
     711  
     712          self.loop.assert_writer(7, transport._write_ready)
     713          self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
     714  
     715      def test_write_partial_bytearray(self):
     716          data = bytearray(b'data')
     717          self.sock.send.return_value = 2
     718  
     719          transport = self.socket_transport()
     720          transport.write(data)
     721  
     722          self.loop.assert_writer(7, transport._write_ready)
     723          self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
     724          self.assertEqual(data, bytearray(b'data'))  # Hasn't been mutated.
     725  
     726      def test_write_partial_memoryview(self):
     727          data = memoryview(b'data')
     728          self.sock.send.return_value = 2
     729  
     730          transport = self.socket_transport()
     731          transport.write(data)
     732  
     733          self.loop.assert_writer(7, transport._write_ready)
     734          self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
     735  
     736      def test_write_partial_none(self):
     737          data = b'data'
     738          self.sock.send.return_value = 0
     739          self.sock.fileno.return_value = 7
     740  
     741          transport = self.socket_transport()
     742          transport.write(data)
     743  
     744          self.loop.assert_writer(7, transport._write_ready)
     745          self.assertEqual(list_to_buffer([b'data']), transport._buffer)
     746  
     747      def test_write_tryagain(self):
     748          self.sock.send.side_effect = BlockingIOError
     749  
     750          data = b'data'
     751          transport = self.socket_transport()
     752          transport.write(data)
     753  
     754          self.loop.assert_writer(7, transport._write_ready)
     755          self.assertEqual(list_to_buffer([b'data']), transport._buffer)
     756  
     757      def test_write_sendmsg_no_data(self):
     758          self.sock.sendmsg = mock.Mock()
     759          self.sock.sendmsg.return_value = 0
     760          transport = self.socket_transport(sendmsg=True)
     761          transport._buffer.append(memoryview(b'data'))
     762          transport.write(b'')
     763          self.assertFalse(self.sock.sendmsg.called)
     764          self.assertEqual(list_to_buffer([b'data']), transport._buffer)
     765  
     766      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     767      def test_writelines_sendmsg_full(self):
     768          data = memoryview(b'data')
     769          self.sock.sendmsg = mock.Mock()
     770          self.sock.sendmsg.return_value = len(data)
     771  
     772          transport = self.socket_transport(sendmsg=True)
     773          transport.writelines([data])
     774          self.assertTrue(self.sock.sendmsg.called)
     775          self.assertFalse(self.loop.writers)
     776  
     777      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     778      def test_writelines_sendmsg_partial(self):
     779          data = memoryview(b'data')
     780          self.sock.sendmsg = mock.Mock()
     781          self.sock.sendmsg.return_value = 2
     782  
     783          transport = self.socket_transport(sendmsg=True)
     784          transport.writelines([data])
     785          self.assertTrue(self.sock.sendmsg.called)
     786          self.assertTrue(self.loop.writers)
     787  
     788      def test_writelines_send_full(self):
     789          data = memoryview(b'data')
     790          self.sock.send.return_value = len(data)
     791          self.sock.send.fileno.return_value = 7
     792  
     793          transport = self.socket_transport()
     794          transport.writelines([data])
     795          self.assertTrue(self.sock.send.called)
     796          self.assertFalse(self.loop.writers)
     797  
     798      def test_writelines_send_partial(self):
     799          data = memoryview(b'data')
     800          self.sock.send.return_value = 2
     801          self.sock.send.fileno.return_value = 7
     802  
     803          transport = self.socket_transport()
     804          transport.writelines([data])
     805          self.assertTrue(self.sock.send.called)
     806          self.assertTrue(self.loop.writers)
     807  
     808      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     809      def test_write_sendmsg_full(self):
     810          data = memoryview(b'data')
     811          self.sock.sendmsg = mock.Mock()
     812          self.sock.sendmsg.return_value = len(data)
     813  
     814          transport = self.socket_transport(sendmsg=True)
     815          transport._buffer.append(data)
     816          self.loop._add_writer(7, transport._write_ready)
     817          transport._write_ready()
     818          self.assertTrue(self.sock.sendmsg.called)
     819          self.assertFalse(self.loop.writers)
     820  
     821      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     822      def test_write_sendmsg_partial(self):
     823  
     824          data = memoryview(b'data')
     825          self.sock.sendmsg = mock.Mock()
     826          # Sent partial data
     827          self.sock.sendmsg.return_value = 2
     828  
     829          transport = self.socket_transport(sendmsg=True)
     830          transport._buffer.append(data)
     831          self.loop._add_writer(7, transport._write_ready)
     832          transport._write_ready()
     833          self.assertTrue(self.sock.sendmsg.called)
     834          self.assertTrue(self.loop.writers)
     835          self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
     836  
     837      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     838      def test_write_sendmsg_half_buffer(self):
     839          data = [memoryview(b'data1'), memoryview(b'data2')]
     840          self.sock.sendmsg = mock.Mock()
     841          # Sent partial data
     842          self.sock.sendmsg.return_value = 2
     843  
     844          transport = self.socket_transport(sendmsg=True)
     845          transport._buffer.extend(data)
     846          self.loop._add_writer(7, transport._write_ready)
     847          transport._write_ready()
     848          self.assertTrue(self.sock.sendmsg.called)
     849          self.assertTrue(self.loop.writers)
     850          self.assertEqual(list_to_buffer([b'ta1', b'data2']), transport._buffer)
     851  
     852      @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
     853      def test_write_sendmsg_OSError(self):
     854          data = memoryview(b'data')
     855          self.sock.sendmsg = mock.Mock()
     856          err = self.sock.sendmsg.side_effect = OSError()
     857  
     858          transport = self.socket_transport(sendmsg=True)
     859          transport._fatal_error = mock.Mock()
     860          transport._buffer.extend(data)
     861          # Calls _fatal_error and clears the buffer
     862          transport._write_ready()
     863          self.assertTrue(self.sock.sendmsg.called)
     864          self.assertFalse(self.loop.writers)
     865          self.assertEqual(list_to_buffer([]), transport._buffer)
     866          transport._fatal_error.assert_called_with(
     867                                     err,
     868                                     'Fatal write error on socket transport')
     869  
     870      @mock.patch('asyncio.selector_events.logger')
     871      def test_write_exception(self, m_log):
     872          err = self.sock.send.side_effect = OSError()
     873  
     874          data = b'data'
     875          transport = self.socket_transport()
     876          transport._fatal_error = mock.Mock()
     877          transport.write(data)
     878          transport._fatal_error.assert_called_with(
     879                                     err,
     880                                     'Fatal write error on socket transport')
     881          transport._conn_lost = 1
     882  
     883          self.sock.reset_mock()
     884          transport.write(data)
     885          self.assertFalse(self.sock.send.called)
     886          self.assertEqual(transport._conn_lost, 2)
     887          transport.write(data)
     888          transport.write(data)
     889          transport.write(data)
     890          transport.write(data)
     891          m_log.warning.assert_called_with('socket.send() raised exception.')
     892  
     893      def test_write_str(self):
     894          transport = self.socket_transport()
     895          self.assertRaises(TypeError, transport.write, 'str')
     896  
     897      def test_write_closing(self):
     898          transport = self.socket_transport()
     899          transport.close()
     900          self.assertEqual(transport._conn_lost, 1)
     901          transport.write(b'data')
     902          self.assertEqual(transport._conn_lost, 2)
     903  
     904      def test_write_ready(self):
     905          data = b'data'
     906          self.sock.send.return_value = len(data)
     907  
     908          transport = self.socket_transport()
     909          transport._buffer.append(data)
     910          self.loop._add_writer(7, transport._write_ready)
     911          transport._write_ready()
     912          self.assertTrue(self.sock.send.called)
     913          self.assertFalse(self.loop.writers)
     914  
     915      def test_write_ready_closing(self):
     916          data = memoryview(b'data')
     917          self.sock.send.return_value = len(data)
     918  
     919          transport = self.socket_transport()
     920          transport._closing = True
     921          transport._buffer.append(data)
     922          self.loop._add_writer(7, transport._write_ready)
     923          transport._write_ready()
     924          self.assertTrue(self.sock.send.called)
     925          self.assertFalse(self.loop.writers)
     926          self.sock.close.assert_called_with()
     927          self.protocol.connection_lost.assert_called_with(None)
     928  
     929      @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
     930      def test_write_ready_no_data(self):
     931          transport = self.socket_transport()
     932          # This is an internal error.
     933          self.assertRaises(AssertionError, transport._write_ready)
     934  
     935      def test_write_ready_partial(self):
     936          data = memoryview(b'data')
     937          self.sock.send.return_value = 2
     938  
     939          transport = self.socket_transport()
     940          transport._buffer.append(data)
     941          self.loop._add_writer(7, transport._write_ready)
     942          transport._write_ready()
     943          self.loop.assert_writer(7, transport._write_ready)
     944          self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
     945  
     946      def test_write_ready_partial_none(self):
     947          data = b'data'
     948          self.sock.send.return_value = 0
     949  
     950          transport = self.socket_transport()
     951          transport._buffer.append(data)
     952          self.loop._add_writer(7, transport._write_ready)
     953          transport._write_ready()
     954          self.loop.assert_writer(7, transport._write_ready)
     955          self.assertEqual(list_to_buffer([b'data']), transport._buffer)
     956  
     957      def test_write_ready_tryagain(self):
     958          self.sock.send.side_effect = BlockingIOError
     959  
     960          transport = self.socket_transport()
     961          buffer = list_to_buffer([b'data1', b'data2'])
     962          transport._buffer = buffer
     963          self.loop._add_writer(7, transport._write_ready)
     964          transport._write_ready()
     965  
     966          self.loop.assert_writer(7, transport._write_ready)
     967          self.assertEqual(buffer, transport._buffer)
     968  
     969      def test_write_ready_exception(self):
     970          err = self.sock.send.side_effect = OSError()
     971  
     972          transport = self.socket_transport()
     973          transport._fatal_error = mock.Mock()
     974          transport._buffer.extend(b'data')
     975          transport._write_ready()
     976          transport._fatal_error.assert_called_with(
     977                                     err,
     978                                     'Fatal write error on socket transport')
     979  
     980      def test_write_eof(self):
     981          tr = self.socket_transport()
     982          self.assertTrue(tr.can_write_eof())
     983          tr.write_eof()
     984          self.sock.shutdown.assert_called_with(socket.SHUT_WR)
     985          tr.write_eof()
     986          self.assertEqual(self.sock.shutdown.call_count, 1)
     987          tr.close()
     988  
     989      def test_write_eof_buffer(self):
     990          tr = self.socket_transport()
     991          self.sock.send.side_effect = BlockingIOError
     992          tr.write(b'data')
     993          tr.write_eof()
     994          self.assertEqual(tr._buffer, list_to_buffer([b'data']))
     995          self.assertTrue(tr._eof)
     996          self.assertFalse(self.sock.shutdown.called)
     997          self.sock.send.side_effect = lambda _: 4
     998          tr._write_ready()
     999          self.assertTrue(self.sock.send.called)
    1000          self.sock.shutdown.assert_called_with(socket.SHUT_WR)
    1001          tr.close()
    1002  
    1003      def test_write_eof_after_close(self):
    1004          tr = self.socket_transport()
    1005          tr.close()
    1006          self.loop.run_until_complete(asyncio.sleep(0))
    1007          tr.write_eof()
    1008  
    1009      @mock.patch('asyncio.base_events.logger')
    1010      def test_transport_close_remove_writer(self, m_log):
    1011          remove_writer = self.loop._remove_writer = mock.Mock()
    1012  
    1013          transport = self.socket_transport()
    1014          transport.close()
    1015          remove_writer.assert_called_with(self.sock_fd)
    1016  
    1017  
    1018  class ESC[4;38;5;81mSelectorSocketTransportBufferedProtocolTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1019  
    1020      def setUp(self):
    1021          super().setUp()
    1022          self.loop = self.new_test_loop()
    1023  
    1024          self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
    1025          self.buf = bytearray(1)
    1026          self.protocol.get_buffer.side_effect = lambda hint: self.buf
    1027  
    1028          self.sock = mock.Mock(socket.socket)
    1029          self.sock_fd = self.sock.fileno.return_value = 7
    1030  
    1031      def socket_transport(self, waiter=None):
    1032          transport = _SelectorSocketTransport(self.loop, self.sock,
    1033                                               self.protocol, waiter=waiter)
    1034          self.addCleanup(close_transport, transport)
    1035          return transport
    1036  
    1037      def test_ctor(self):
    1038          waiter = self.loop.create_future()
    1039          tr = self.socket_transport(waiter=waiter)
    1040          self.loop.run_until_complete(waiter)
    1041  
    1042          self.loop.assert_reader(7, tr._read_ready)
    1043          test_utils.run_briefly(self.loop)
    1044          self.protocol.connection_made.assert_called_with(tr)
    1045  
    1046      def test_get_buffer_error(self):
    1047          transport = self.socket_transport()
    1048          transport._fatal_error = mock.Mock()
    1049  
    1050          self.loop.call_exception_handler = mock.Mock()
    1051          self.protocol.get_buffer.side_effect = LookupError()
    1052  
    1053          transport._read_ready()
    1054  
    1055          self.assertTrue(transport._fatal_error.called)
    1056          self.assertTrue(self.protocol.get_buffer.called)
    1057          self.assertFalse(self.protocol.buffer_updated.called)
    1058  
    1059      def test_get_buffer_zerosized(self):
    1060          transport = self.socket_transport()
    1061          transport._fatal_error = mock.Mock()
    1062  
    1063          self.loop.call_exception_handler = mock.Mock()
    1064          self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
    1065  
    1066          transport._read_ready()
    1067  
    1068          self.assertTrue(transport._fatal_error.called)
    1069          self.assertTrue(self.protocol.get_buffer.called)
    1070          self.assertFalse(self.protocol.buffer_updated.called)
    1071  
    1072      def test_proto_type_switch(self):
    1073          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
    1074          transport = self.socket_transport()
    1075  
    1076          self.sock.recv.return_value = b'data'
    1077          transport._read_ready()
    1078  
    1079          self.protocol.data_received.assert_called_with(b'data')
    1080  
    1081          # switch protocol to a BufferedProtocol
    1082  
    1083          buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
    1084          buf = bytearray(4)
    1085          buf_proto.get_buffer.side_effect = lambda hint: buf
    1086  
    1087          transport.set_protocol(buf_proto)
    1088  
    1089          self.sock.recv_into.return_value = 10
    1090          transport._read_ready()
    1091  
    1092          buf_proto.get_buffer.assert_called_with(-1)
    1093          buf_proto.buffer_updated.assert_called_with(10)
    1094  
    1095      def test_buffer_updated_error(self):
    1096          transport = self.socket_transport()
    1097          transport._fatal_error = mock.Mock()
    1098  
    1099          self.loop.call_exception_handler = mock.Mock()
    1100          self.protocol.buffer_updated.side_effect = LookupError()
    1101  
    1102          self.sock.recv_into.return_value = 10
    1103          transport._read_ready()
    1104  
    1105          self.assertTrue(transport._fatal_error.called)
    1106          self.assertTrue(self.protocol.get_buffer.called)
    1107          self.assertTrue(self.protocol.buffer_updated.called)
    1108  
    1109      def test_read_eof_received_error(self):
    1110          transport = self.socket_transport()
    1111          transport.close = mock.Mock()
    1112          transport._fatal_error = mock.Mock()
    1113  
    1114          self.loop.call_exception_handler = mock.Mock()
    1115  
    1116          self.protocol.eof_received.side_effect = LookupError()
    1117  
    1118          self.sock.recv_into.return_value = 0
    1119          transport._read_ready()
    1120  
    1121          self.protocol.eof_received.assert_called_with()
    1122          self.assertTrue(transport._fatal_error.called)
    1123  
    1124      def test_read_ready(self):
    1125          transport = self.socket_transport()
    1126  
    1127          self.sock.recv_into.return_value = 10
    1128          transport._read_ready()
    1129  
    1130          self.protocol.get_buffer.assert_called_with(-1)
    1131          self.protocol.buffer_updated.assert_called_with(10)
    1132  
    1133      def test_read_ready_eof(self):
    1134          transport = self.socket_transport()
    1135          transport.close = mock.Mock()
    1136  
    1137          self.sock.recv_into.return_value = 0
    1138          transport._read_ready()
    1139  
    1140          self.protocol.eof_received.assert_called_with()
    1141          transport.close.assert_called_with()
    1142  
    1143      def test_read_ready_eof_keep_open(self):
    1144          transport = self.socket_transport()
    1145          transport.close = mock.Mock()
    1146  
    1147          self.sock.recv_into.return_value = 0
    1148          self.protocol.eof_received.return_value = True
    1149          transport._read_ready()
    1150  
    1151          self.protocol.eof_received.assert_called_with()
    1152          self.assertFalse(transport.close.called)
    1153  
    1154      @mock.patch('logging.exception')
    1155      def test_read_ready_tryagain(self, m_exc):
    1156          self.sock.recv_into.side_effect = BlockingIOError
    1157  
    1158          transport = self.socket_transport()
    1159          transport._fatal_error = mock.Mock()
    1160          transport._read_ready()
    1161  
    1162          self.assertFalse(transport._fatal_error.called)
    1163  
    1164      @mock.patch('logging.exception')
    1165      def test_read_ready_tryagain_interrupted(self, m_exc):
    1166          self.sock.recv_into.side_effect = InterruptedError
    1167  
    1168          transport = self.socket_transport()
    1169          transport._fatal_error = mock.Mock()
    1170          transport._read_ready()
    1171  
    1172          self.assertFalse(transport._fatal_error.called)
    1173  
    1174      @mock.patch('logging.exception')
    1175      def test_read_ready_conn_reset(self, m_exc):
    1176          err = self.sock.recv_into.side_effect = ConnectionResetError()
    1177  
    1178          transport = self.socket_transport()
    1179          transport._force_close = mock.Mock()
    1180          with test_utils.disable_logger():
    1181              transport._read_ready()
    1182          transport._force_close.assert_called_with(err)
    1183  
    1184      @mock.patch('logging.exception')
    1185      def test_read_ready_err(self, m_exc):
    1186          err = self.sock.recv_into.side_effect = OSError()
    1187  
    1188          transport = self.socket_transport()
    1189          transport._fatal_error = mock.Mock()
    1190          transport._read_ready()
    1191  
    1192          transport._fatal_error.assert_called_with(
    1193                                     err,
    1194                                     'Fatal read error on socket transport')
    1195  
    1196  
    1197  class ESC[4;38;5;81mSelectorDatagramTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1198  
    1199      def setUp(self):
    1200          super().setUp()
    1201          self.loop = self.new_test_loop()
    1202          self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
    1203          self.sock = mock.Mock(spec_set=socket.socket)
    1204          self.sock.fileno.return_value = 7
    1205  
    1206      def datagram_transport(self, address=None):
    1207          self.sock.getpeername.side_effect = None if address else OSError
    1208          transport = _SelectorDatagramTransport(self.loop, self.sock,
    1209                                                 self.protocol,
    1210                                                 address=address)
    1211          self.addCleanup(close_transport, transport)
    1212          return transport
    1213  
    1214      def test_read_ready(self):
    1215          transport = self.datagram_transport()
    1216  
    1217          self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234))
    1218          transport._read_ready()
    1219  
    1220          self.protocol.datagram_received.assert_called_with(
    1221              b'data', ('0.0.0.0', 1234))
    1222  
    1223      def test_transport_inheritance(self):
    1224          transport = self.datagram_transport()
    1225          self.assertIsInstance(transport, asyncio.DatagramTransport)
    1226  
    1227      def test_read_ready_tryagain(self):
    1228          transport = self.datagram_transport()
    1229  
    1230          self.sock.recvfrom.side_effect = BlockingIOError
    1231          transport._fatal_error = mock.Mock()
    1232          transport._read_ready()
    1233  
    1234          self.assertFalse(transport._fatal_error.called)
    1235  
    1236      def test_read_ready_err(self):
    1237          transport = self.datagram_transport()
    1238  
    1239          err = self.sock.recvfrom.side_effect = RuntimeError()
    1240          transport._fatal_error = mock.Mock()
    1241          transport._read_ready()
    1242  
    1243          transport._fatal_error.assert_called_with(
    1244                                     err,
    1245                                     'Fatal read error on datagram transport')
    1246  
    1247      def test_read_ready_oserr(self):
    1248          transport = self.datagram_transport()
    1249  
    1250          err = self.sock.recvfrom.side_effect = OSError()
    1251          transport._fatal_error = mock.Mock()
    1252          transport._read_ready()
    1253  
    1254          self.assertFalse(transport._fatal_error.called)
    1255          self.protocol.error_received.assert_called_with(err)
    1256  
    1257      def test_sendto(self):
    1258          data = b'data'
    1259          transport = self.datagram_transport()
    1260          transport.sendto(data, ('0.0.0.0', 1234))
    1261          self.assertTrue(self.sock.sendto.called)
    1262          self.assertEqual(
    1263              self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
    1264  
    1265      def test_sendto_bytearray(self):
    1266          data = bytearray(b'data')
    1267          transport = self.datagram_transport()
    1268          transport.sendto(data, ('0.0.0.0', 1234))
    1269          self.assertTrue(self.sock.sendto.called)
    1270          self.assertEqual(
    1271              self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
    1272  
    1273      def test_sendto_memoryview(self):
    1274          data = memoryview(b'data')
    1275          transport = self.datagram_transport()
    1276          transport.sendto(data, ('0.0.0.0', 1234))
    1277          self.assertTrue(self.sock.sendto.called)
    1278          self.assertEqual(
    1279              self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
    1280  
    1281      def test_sendto_no_data(self):
    1282          transport = self.datagram_transport()
    1283          transport._buffer.append((b'data', ('0.0.0.0', 12345)))
    1284          transport.sendto(b'', ())
    1285          self.assertFalse(self.sock.sendto.called)
    1286          self.assertEqual(
    1287              [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
    1288  
    1289      def test_sendto_buffer(self):
    1290          transport = self.datagram_transport()
    1291          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
    1292          transport.sendto(b'data2', ('0.0.0.0', 12345))
    1293          self.assertFalse(self.sock.sendto.called)
    1294          self.assertEqual(
    1295              [(b'data1', ('0.0.0.0', 12345)),
    1296               (b'data2', ('0.0.0.0', 12345))],
    1297              list(transport._buffer))
    1298  
    1299      def test_sendto_buffer_bytearray(self):
    1300          data2 = bytearray(b'data2')
    1301          transport = self.datagram_transport()
    1302          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
    1303          transport.sendto(data2, ('0.0.0.0', 12345))
    1304          self.assertFalse(self.sock.sendto.called)
    1305          self.assertEqual(
    1306              [(b'data1', ('0.0.0.0', 12345)),
    1307               (b'data2', ('0.0.0.0', 12345))],
    1308              list(transport._buffer))
    1309          self.assertIsInstance(transport._buffer[1][0], bytes)
    1310  
    1311      def test_sendto_buffer_memoryview(self):
    1312          data2 = memoryview(b'data2')
    1313          transport = self.datagram_transport()
    1314          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
    1315          transport.sendto(data2, ('0.0.0.0', 12345))
    1316          self.assertFalse(self.sock.sendto.called)
    1317          self.assertEqual(
    1318              [(b'data1', ('0.0.0.0', 12345)),
    1319               (b'data2', ('0.0.0.0', 12345))],
    1320              list(transport._buffer))
    1321          self.assertIsInstance(transport._buffer[1][0], bytes)
    1322  
    1323      def test_sendto_tryagain(self):
    1324          data = b'data'
    1325  
    1326          self.sock.sendto.side_effect = BlockingIOError
    1327  
    1328          transport = self.datagram_transport()
    1329          transport.sendto(data, ('0.0.0.0', 12345))
    1330  
    1331          self.loop.assert_writer(7, transport._sendto_ready)
    1332          self.assertEqual(
    1333              [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
    1334  
    1335      @mock.patch('asyncio.selector_events.logger')
    1336      def test_sendto_exception(self, m_log):
    1337          data = b'data'
    1338          err = self.sock.sendto.side_effect = RuntimeError()
    1339  
    1340          transport = self.datagram_transport()
    1341          transport._fatal_error = mock.Mock()
    1342          transport.sendto(data, ())
    1343  
    1344          self.assertTrue(transport._fatal_error.called)
    1345          transport._fatal_error.assert_called_with(
    1346                                     err,
    1347                                     'Fatal write error on datagram transport')
    1348          transport._conn_lost = 1
    1349  
    1350          transport._address = ('123',)
    1351          transport.sendto(data)
    1352          transport.sendto(data)
    1353          transport.sendto(data)
    1354          transport.sendto(data)
    1355          transport.sendto(data)
    1356          m_log.warning.assert_called_with('socket.send() raised exception.')
    1357  
    1358      def test_sendto_error_received(self):
    1359          data = b'data'
    1360  
    1361          self.sock.sendto.side_effect = ConnectionRefusedError
    1362  
    1363          transport = self.datagram_transport()
    1364          transport._fatal_error = mock.Mock()
    1365          transport.sendto(data, ())
    1366  
    1367          self.assertEqual(transport._conn_lost, 0)
    1368          self.assertFalse(transport._fatal_error.called)
    1369  
    1370      def test_sendto_error_received_connected(self):
    1371          data = b'data'
    1372  
    1373          self.sock.send.side_effect = ConnectionRefusedError
    1374  
    1375          transport = self.datagram_transport(address=('0.0.0.0', 1))
    1376          transport._fatal_error = mock.Mock()
    1377          transport.sendto(data)
    1378  
    1379          self.assertFalse(transport._fatal_error.called)
    1380          self.assertTrue(self.protocol.error_received.called)
    1381  
    1382      def test_sendto_str(self):
    1383          transport = self.datagram_transport()
    1384          self.assertRaises(TypeError, transport.sendto, 'str', ())
    1385  
    1386      def test_sendto_connected_addr(self):
    1387          transport = self.datagram_transport(address=('0.0.0.0', 1))
    1388          self.assertRaises(
    1389              ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
    1390  
    1391      def test_sendto_closing(self):
    1392          transport = self.datagram_transport(address=(1,))
    1393          transport.close()
    1394          self.assertEqual(transport._conn_lost, 1)
    1395          transport.sendto(b'data', (1,))
    1396          self.assertEqual(transport._conn_lost, 2)
    1397  
    1398      def test_sendto_ready(self):
    1399          data = b'data'
    1400          self.sock.sendto.return_value = len(data)
    1401  
    1402          transport = self.datagram_transport()
    1403          transport._buffer.append((data, ('0.0.0.0', 12345)))
    1404          self.loop._add_writer(7, transport._sendto_ready)
    1405          transport._sendto_ready()
    1406          self.assertTrue(self.sock.sendto.called)
    1407          self.assertEqual(
    1408              self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
    1409          self.assertFalse(self.loop.writers)
    1410  
    1411      def test_sendto_ready_closing(self):
    1412          data = b'data'
    1413          self.sock.send.return_value = len(data)
    1414  
    1415          transport = self.datagram_transport()
    1416          transport._closing = True
    1417          transport._buffer.append((data, ()))
    1418          self.loop._add_writer(7, transport._sendto_ready)
    1419          transport._sendto_ready()
    1420          self.sock.sendto.assert_called_with(data, ())
    1421          self.assertFalse(self.loop.writers)
    1422          self.sock.close.assert_called_with()
    1423          self.protocol.connection_lost.assert_called_with(None)
    1424  
    1425      def test_sendto_ready_no_data(self):
    1426          transport = self.datagram_transport()
    1427          self.loop._add_writer(7, transport._sendto_ready)
    1428          transport._sendto_ready()
    1429          self.assertFalse(self.sock.sendto.called)
    1430          self.assertFalse(self.loop.writers)
    1431  
    1432      def test_sendto_ready_tryagain(self):
    1433          self.sock.sendto.side_effect = BlockingIOError
    1434  
    1435          transport = self.datagram_transport()
    1436          transport._buffer.extend([(b'data1', ()), (b'data2', ())])
    1437          self.loop._add_writer(7, transport._sendto_ready)
    1438          transport._sendto_ready()
    1439  
    1440          self.loop.assert_writer(7, transport._sendto_ready)
    1441          self.assertEqual(
    1442              [(b'data1', ()), (b'data2', ())],
    1443              list(transport._buffer))
    1444  
    1445      def test_sendto_ready_exception(self):
    1446          err = self.sock.sendto.side_effect = RuntimeError()
    1447  
    1448          transport = self.datagram_transport()
    1449          transport._fatal_error = mock.Mock()
    1450          transport._buffer.append((b'data', ()))
    1451          transport._sendto_ready()
    1452  
    1453          transport._fatal_error.assert_called_with(
    1454                                     err,
    1455                                     'Fatal write error on datagram transport')
    1456  
    1457      def test_sendto_ready_error_received(self):
    1458          self.sock.sendto.side_effect = ConnectionRefusedError
    1459  
    1460          transport = self.datagram_transport()
    1461          transport._fatal_error = mock.Mock()
    1462          transport._buffer.append((b'data', ()))
    1463          transport._sendto_ready()
    1464  
    1465          self.assertFalse(transport._fatal_error.called)
    1466  
    1467      def test_sendto_ready_error_received_connection(self):
    1468          self.sock.send.side_effect = ConnectionRefusedError
    1469  
    1470          transport = self.datagram_transport(address=('0.0.0.0', 1))
    1471          transport._fatal_error = mock.Mock()
    1472          transport._buffer.append((b'data', ()))
    1473          transport._sendto_ready()
    1474  
    1475          self.assertFalse(transport._fatal_error.called)
    1476          self.assertTrue(self.protocol.error_received.called)
    1477  
    1478      @mock.patch('asyncio.base_events.logger.error')
    1479      def test_fatal_error_connected(self, m_exc):
    1480          transport = self.datagram_transport(address=('0.0.0.0', 1))
    1481          err = ConnectionRefusedError()
    1482          transport._fatal_error(err)
    1483          self.assertFalse(self.protocol.error_received.called)
    1484          m_exc.assert_not_called()
    1485  
    1486      @mock.patch('asyncio.base_events.logger.error')
    1487      def test_fatal_error_connected_custom_error(self, m_exc):
    1488          class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
    1489              pass
    1490          transport = self.datagram_transport(address=('0.0.0.0', 1))
    1491          err = MyException()
    1492          transport._fatal_error(err)
    1493          self.assertFalse(self.protocol.error_received.called)
    1494          m_exc.assert_called_with(
    1495              test_utils.MockPattern(
    1496                  'Fatal error on transport\nprotocol:.*\ntransport:.*'),
    1497              exc_info=(MyException, MOCK_ANY, MOCK_ANY))
    1498  
    1499  
    1500  if __name__ == '__main__':
    1501      unittest.main()