python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_proactor_events.py
       1  """Tests for proactor_events.py"""
       2  
       3  import io
       4  import socket
       5  import unittest
       6  import sys
       7  from unittest import mock
       8  
       9  import asyncio
      10  from asyncio.proactor_events import BaseProactorEventLoop
      11  from asyncio.proactor_events import _ProactorSocketTransport
      12  from asyncio.proactor_events import _ProactorWritePipeTransport
      13  from asyncio.proactor_events import _ProactorDuplexPipeTransport
      14  from asyncio.proactor_events import _ProactorDatagramTransport
      15  from test.support import os_helper
      16  from test.support import socket_helper
      17  from test.test_asyncio import utils as test_utils
      18  
      19  
      20  def tearDownModule():
      21      asyncio.set_event_loop_policy(None)
      22  
      23  
      24  def close_transport(transport):
      25      # Don't call transport.close() because the event loop and the IOCP proactor
      26      # are mocked
      27      if transport._sock is None:
      28          return
      29      transport._sock.close()
      30      transport._sock = None
      31  
      32  
      33  class ESC[4;38;5;81mProactorSocketTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      34  
      35      def setUp(self):
      36          super().setUp()
      37          self.loop = self.new_test_loop()
      38          self.addCleanup(self.loop.close)
      39          self.proactor = mock.Mock()
      40          self.loop._proactor = self.proactor
      41          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
      42          self.sock = mock.Mock(socket.socket)
      43          self.buffer_size = 65536
      44  
      45      def socket_transport(self, waiter=None):
      46          transport = _ProactorSocketTransport(self.loop, self.sock,
      47                                               self.protocol, waiter=waiter)
      48          self.addCleanup(close_transport, transport)
      49          return transport
      50  
      51      def test_ctor(self):
      52          fut = self.loop.create_future()
      53          tr = self.socket_transport(waiter=fut)
      54          test_utils.run_briefly(self.loop)
      55          self.assertIsNone(fut.result())
      56          self.protocol.connection_made(tr)
      57          self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
      58  
      59      def test_loop_reading(self):
      60          tr = self.socket_transport()
      61          tr._loop_reading()
      62          self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
      63          self.assertFalse(self.protocol.data_received.called)
      64          self.assertFalse(self.protocol.eof_received.called)
      65  
      66      def test_loop_reading_data(self):
      67          buf = b'data'
      68          res = self.loop.create_future()
      69          res.set_result(len(buf))
      70  
      71          tr = self.socket_transport()
      72          tr._read_fut = res
      73          tr._data[:len(buf)] = buf
      74          tr._loop_reading(res)
      75          called_buf = bytearray(self.buffer_size)
      76          called_buf[:len(buf)] = buf
      77          self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
      78          self.protocol.data_received.assert_called_with(buf)
      79          # assert_called_with maps bytearray and bytes to the same thing so check manually
      80          # regression test for https://github.com/python/cpython/issues/99941
      81          self.assertIsInstance(self.protocol.data_received.call_args.args[0], bytes)
      82  
      83      @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
      84      def test_loop_reading_no_data(self):
      85          res = self.loop.create_future()
      86          res.set_result(0)
      87  
      88          tr = self.socket_transport()
      89          self.assertRaises(AssertionError, tr._loop_reading, res)
      90  
      91          tr.close = mock.Mock()
      92          tr._read_fut = res
      93          tr._loop_reading(res)
      94          self.assertFalse(self.loop._proactor.recv_into.called)
      95          self.assertTrue(self.protocol.eof_received.called)
      96          self.assertTrue(tr.close.called)
      97  
      98      def test_loop_reading_aborted(self):
      99          err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
     100  
     101          tr = self.socket_transport()
     102          tr._fatal_error = mock.Mock()
     103          tr._loop_reading()
     104          tr._fatal_error.assert_called_with(
     105                              err,
     106                              'Fatal read error on pipe transport')
     107  
     108      def test_loop_reading_aborted_closing(self):
     109          self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
     110  
     111          tr = self.socket_transport()
     112          tr._closing = True
     113          tr._fatal_error = mock.Mock()
     114          tr._loop_reading()
     115          self.assertFalse(tr._fatal_error.called)
     116  
     117      def test_loop_reading_aborted_is_fatal(self):
     118          self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
     119          tr = self.socket_transport()
     120          tr._closing = False
     121          tr._fatal_error = mock.Mock()
     122          tr._loop_reading()
     123          self.assertTrue(tr._fatal_error.called)
     124  
     125      def test_loop_reading_conn_reset_lost(self):
     126          err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
     127  
     128          tr = self.socket_transport()
     129          tr._closing = False
     130          tr._fatal_error = mock.Mock()
     131          tr._force_close = mock.Mock()
     132          tr._loop_reading()
     133          self.assertFalse(tr._fatal_error.called)
     134          tr._force_close.assert_called_with(err)
     135  
     136      def test_loop_reading_exception(self):
     137          err = self.loop._proactor.recv_into.side_effect = (OSError())
     138  
     139          tr = self.socket_transport()
     140          tr._fatal_error = mock.Mock()
     141          tr._loop_reading()
     142          tr._fatal_error.assert_called_with(
     143                              err,
     144                              'Fatal read error on pipe transport')
     145  
     146      def test_write(self):
     147          tr = self.socket_transport()
     148          tr._loop_writing = mock.Mock()
     149          tr.write(b'data')
     150          self.assertEqual(tr._buffer, None)
     151          tr._loop_writing.assert_called_with(data=b'data')
     152  
     153      def test_write_no_data(self):
     154          tr = self.socket_transport()
     155          tr.write(b'')
     156          self.assertFalse(tr._buffer)
     157  
     158      def test_write_more(self):
     159          tr = self.socket_transport()
     160          tr._write_fut = mock.Mock()
     161          tr._loop_writing = mock.Mock()
     162          tr.write(b'data')
     163          self.assertEqual(tr._buffer, b'data')
     164          self.assertFalse(tr._loop_writing.called)
     165  
     166      def test_loop_writing(self):
     167          tr = self.socket_transport()
     168          tr._buffer = bytearray(b'data')
     169          tr._loop_writing()
     170          self.loop._proactor.send.assert_called_with(self.sock, b'data')
     171          self.loop._proactor.send.return_value.add_done_callback.\
     172              assert_called_with(tr._loop_writing)
     173  
     174      @mock.patch('asyncio.proactor_events.logger')
     175      def test_loop_writing_err(self, m_log):
     176          err = self.loop._proactor.send.side_effect = OSError()
     177          tr = self.socket_transport()
     178          tr._fatal_error = mock.Mock()
     179          tr._buffer = [b'da', b'ta']
     180          tr._loop_writing()
     181          tr._fatal_error.assert_called_with(
     182                              err,
     183                              'Fatal write error on pipe transport')
     184          tr._conn_lost = 1
     185  
     186          tr.write(b'data')
     187          tr.write(b'data')
     188          tr.write(b'data')
     189          tr.write(b'data')
     190          tr.write(b'data')
     191          self.assertEqual(tr._buffer, None)
     192          m_log.warning.assert_called_with('socket.send() raised exception.')
     193  
     194      def test_loop_writing_stop(self):
     195          fut = self.loop.create_future()
     196          fut.set_result(b'data')
     197  
     198          tr = self.socket_transport()
     199          tr._write_fut = fut
     200          tr._loop_writing(fut)
     201          self.assertIsNone(tr._write_fut)
     202  
     203      def test_loop_writing_closing(self):
     204          fut = self.loop.create_future()
     205          fut.set_result(1)
     206  
     207          tr = self.socket_transport()
     208          tr._write_fut = fut
     209          tr.close()
     210          tr._loop_writing(fut)
     211          self.assertIsNone(tr._write_fut)
     212          test_utils.run_briefly(self.loop)
     213          self.protocol.connection_lost.assert_called_with(None)
     214  
     215      def test_abort(self):
     216          tr = self.socket_transport()
     217          tr._force_close = mock.Mock()
     218          tr.abort()
     219          tr._force_close.assert_called_with(None)
     220  
     221      def test_close(self):
     222          tr = self.socket_transport()
     223          tr.close()
     224          test_utils.run_briefly(self.loop)
     225          self.protocol.connection_lost.assert_called_with(None)
     226          self.assertTrue(tr.is_closing())
     227          self.assertEqual(tr._conn_lost, 1)
     228  
     229          self.protocol.connection_lost.reset_mock()
     230          tr.close()
     231          test_utils.run_briefly(self.loop)
     232          self.assertFalse(self.protocol.connection_lost.called)
     233  
     234      def test_close_write_fut(self):
     235          tr = self.socket_transport()
     236          tr._write_fut = mock.Mock()
     237          tr.close()
     238          test_utils.run_briefly(self.loop)
     239          self.assertFalse(self.protocol.connection_lost.called)
     240  
     241      def test_close_buffer(self):
     242          tr = self.socket_transport()
     243          tr._buffer = [b'data']
     244          tr.close()
     245          test_utils.run_briefly(self.loop)
     246          self.assertFalse(self.protocol.connection_lost.called)
     247  
     248      def test_close_invalid_sockobj(self):
     249          tr = self.socket_transport()
     250          self.sock.fileno.return_value = -1
     251          tr.close()
     252          test_utils.run_briefly(self.loop)
     253          self.protocol.connection_lost.assert_called_with(None)
     254          self.assertFalse(self.sock.shutdown.called)
     255  
     256      @mock.patch('asyncio.base_events.logger')
     257      def test_fatal_error(self, m_logging):
     258          tr = self.socket_transport()
     259          tr._force_close = mock.Mock()
     260          tr._fatal_error(None)
     261          self.assertTrue(tr._force_close.called)
     262          self.assertTrue(m_logging.error.called)
     263  
     264      def test_force_close(self):
     265          tr = self.socket_transport()
     266          tr._buffer = [b'data']
     267          read_fut = tr._read_fut = mock.Mock()
     268          write_fut = tr._write_fut = mock.Mock()
     269          tr._force_close(None)
     270  
     271          read_fut.cancel.assert_called_with()
     272          write_fut.cancel.assert_called_with()
     273          test_utils.run_briefly(self.loop)
     274          self.protocol.connection_lost.assert_called_with(None)
     275          self.assertEqual(None, tr._buffer)
     276          self.assertEqual(tr._conn_lost, 1)
     277  
     278      def test_loop_writing_force_close(self):
     279          exc_handler = mock.Mock()
     280          self.loop.set_exception_handler(exc_handler)
     281          fut = self.loop.create_future()
     282          fut.set_result(1)
     283          self.proactor.send.return_value = fut
     284  
     285          tr = self.socket_transport()
     286          tr.write(b'data')
     287          tr._force_close(None)
     288          test_utils.run_briefly(self.loop)
     289          exc_handler.assert_not_called()
     290  
     291      def test_force_close_idempotent(self):
     292          tr = self.socket_transport()
     293          tr._closing = True
     294          tr._force_close(None)
     295          test_utils.run_briefly(self.loop)
     296          # See https://github.com/python/cpython/issues/89237
     297          # `protocol.connection_lost` should be called even if
     298          # the transport was closed forcefully otherwise
     299          # the resources held by protocol will never be freed
     300          # and waiters will never be notified leading to hang.
     301          self.assertTrue(self.protocol.connection_lost.called)
     302  
     303      def test_force_close_protocol_connection_lost_once(self):
     304          tr = self.socket_transport()
     305          self.assertFalse(self.protocol.connection_lost.called)
     306          tr._closing = True
     307          # Calling _force_close twice should not call
     308          # protocol.connection_lost twice
     309          tr._force_close(None)
     310          tr._force_close(None)
     311          test_utils.run_briefly(self.loop)
     312          self.assertEqual(1, self.protocol.connection_lost.call_count)
     313  
     314      def test_close_protocol_connection_lost_once(self):
     315          tr = self.socket_transport()
     316          self.assertFalse(self.protocol.connection_lost.called)
     317          # Calling close twice should not call
     318          # protocol.connection_lost twice
     319          tr.close()
     320          tr.close()
     321          test_utils.run_briefly(self.loop)
     322          self.assertEqual(1, self.protocol.connection_lost.call_count)
     323  
     324      def test_fatal_error_2(self):
     325          tr = self.socket_transport()
     326          tr._buffer = [b'data']
     327          tr._force_close(None)
     328  
     329          test_utils.run_briefly(self.loop)
     330          self.protocol.connection_lost.assert_called_with(None)
     331          self.assertEqual(None, tr._buffer)
     332  
     333      def test_call_connection_lost(self):
     334          tr = self.socket_transport()
     335          tr._call_connection_lost(None)
     336          self.assertTrue(self.protocol.connection_lost.called)
     337          self.assertTrue(self.sock.close.called)
     338  
     339      def test_write_eof(self):
     340          tr = self.socket_transport()
     341          self.assertTrue(tr.can_write_eof())
     342          tr.write_eof()
     343          self.sock.shutdown.assert_called_with(socket.SHUT_WR)
     344          tr.write_eof()
     345          self.assertEqual(self.sock.shutdown.call_count, 1)
     346          tr.close()
     347  
     348      def test_write_eof_buffer(self):
     349          tr = self.socket_transport()
     350          f = self.loop.create_future()
     351          tr._loop._proactor.send.return_value = f
     352          tr.write(b'data')
     353          tr.write_eof()
     354          self.assertTrue(tr._eof_written)
     355          self.assertFalse(self.sock.shutdown.called)
     356          tr._loop._proactor.send.assert_called_with(self.sock, b'data')
     357          f.set_result(4)
     358          self.loop._run_once()
     359          self.sock.shutdown.assert_called_with(socket.SHUT_WR)
     360          tr.close()
     361  
     362      def test_write_eof_write_pipe(self):
     363          tr = _ProactorWritePipeTransport(
     364              self.loop, self.sock, self.protocol)
     365          self.assertTrue(tr.can_write_eof())
     366          tr.write_eof()
     367          self.assertTrue(tr.is_closing())
     368          self.loop._run_once()
     369          self.assertTrue(self.sock.close.called)
     370          tr.close()
     371  
     372      def test_write_eof_buffer_write_pipe(self):
     373          tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
     374          f = self.loop.create_future()
     375          tr._loop._proactor.send.return_value = f
     376          tr.write(b'data')
     377          tr.write_eof()
     378          self.assertTrue(tr.is_closing())
     379          self.assertFalse(self.sock.shutdown.called)
     380          tr._loop._proactor.send.assert_called_with(self.sock, b'data')
     381          f.set_result(4)
     382          self.loop._run_once()
     383          self.loop._run_once()
     384          self.assertTrue(self.sock.close.called)
     385          tr.close()
     386  
     387      def test_write_eof_duplex_pipe(self):
     388          tr = _ProactorDuplexPipeTransport(
     389              self.loop, self.sock, self.protocol)
     390          self.assertFalse(tr.can_write_eof())
     391          with self.assertRaises(NotImplementedError):
     392              tr.write_eof()
     393          close_transport(tr)
     394  
     395      def test_pause_resume_reading(self):
     396          tr = self.socket_transport()
     397          index = 0
     398          msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
     399          reversed_msgs = list(reversed(msgs))
     400  
     401          def recv_into(sock, data):
     402              f = self.loop.create_future()
     403              msg = reversed_msgs.pop()
     404  
     405              result = f.result
     406              def monkey():
     407                  data[:len(msg)] = msg
     408                  return result()
     409              f.result = monkey
     410  
     411              f.set_result(len(msg))
     412              return f
     413  
     414          self.loop._proactor.recv_into.side_effect = recv_into
     415          self.loop._run_once()
     416          self.assertFalse(tr._paused)
     417          self.assertTrue(tr.is_reading())
     418  
     419          for msg in msgs[:2]:
     420              self.loop._run_once()
     421              self.protocol.data_received.assert_called_with(bytearray(msg))
     422  
     423          tr.pause_reading()
     424          tr.pause_reading()
     425          self.assertTrue(tr._paused)
     426          self.assertFalse(tr.is_reading())
     427          for i in range(10):
     428              self.loop._run_once()
     429          self.protocol.data_received.assert_called_with(bytearray(msgs[1]))
     430  
     431          tr.resume_reading()
     432          tr.resume_reading()
     433          self.assertFalse(tr._paused)
     434          self.assertTrue(tr.is_reading())
     435  
     436          for msg in msgs[2:4]:
     437              self.loop._run_once()
     438              self.protocol.data_received.assert_called_with(bytearray(msg))
     439  
     440          tr.pause_reading()
     441          tr.resume_reading()
     442          self.loop.call_exception_handler = mock.Mock()
     443          self.loop._run_once()
     444          self.loop.call_exception_handler.assert_not_called()
     445          self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
     446          tr.close()
     447  
     448          self.assertFalse(tr.is_reading())
     449  
     450      def test_pause_reading_connection_made(self):
     451          tr = self.socket_transport()
     452          self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
     453          test_utils.run_briefly(self.loop)
     454          self.assertFalse(tr.is_reading())
     455          self.loop.assert_no_reader(7)
     456  
     457          tr.resume_reading()
     458          self.assertTrue(tr.is_reading())
     459  
     460          tr.close()
     461          self.assertFalse(tr.is_reading())
     462  
     463  
     464      def pause_writing_transport(self, high):
     465          tr = self.socket_transport()
     466          tr.set_write_buffer_limits(high=high)
     467  
     468          self.assertEqual(tr.get_write_buffer_size(), 0)
     469          self.assertFalse(self.protocol.pause_writing.called)
     470          self.assertFalse(self.protocol.resume_writing.called)
     471          return tr
     472  
     473      def test_pause_resume_writing(self):
     474          tr = self.pause_writing_transport(high=4)
     475  
     476          # write a large chunk, must pause writing
     477          fut = self.loop.create_future()
     478          self.loop._proactor.send.return_value = fut
     479          tr.write(b'large data')
     480          self.loop._run_once()
     481          self.assertTrue(self.protocol.pause_writing.called)
     482  
     483          # flush the buffer
     484          fut.set_result(None)
     485          self.loop._run_once()
     486          self.assertEqual(tr.get_write_buffer_size(), 0)
     487          self.assertTrue(self.protocol.resume_writing.called)
     488  
     489      def test_pause_writing_2write(self):
     490          tr = self.pause_writing_transport(high=4)
     491  
     492          # first short write, the buffer is not full (3 <= 4)
     493          fut1 = self.loop.create_future()
     494          self.loop._proactor.send.return_value = fut1
     495          tr.write(b'123')
     496          self.loop._run_once()
     497          self.assertEqual(tr.get_write_buffer_size(), 3)
     498          self.assertFalse(self.protocol.pause_writing.called)
     499  
     500          # fill the buffer, must pause writing (6 > 4)
     501          tr.write(b'abc')
     502          self.loop._run_once()
     503          self.assertEqual(tr.get_write_buffer_size(), 6)
     504          self.assertTrue(self.protocol.pause_writing.called)
     505  
     506      def test_pause_writing_3write(self):
     507          tr = self.pause_writing_transport(high=4)
     508  
     509          # first short write, the buffer is not full (1 <= 4)
     510          fut = self.loop.create_future()
     511          self.loop._proactor.send.return_value = fut
     512          tr.write(b'1')
     513          self.loop._run_once()
     514          self.assertEqual(tr.get_write_buffer_size(), 1)
     515          self.assertFalse(self.protocol.pause_writing.called)
     516  
     517          # second short write, the buffer is not full (3 <= 4)
     518          tr.write(b'23')
     519          self.loop._run_once()
     520          self.assertEqual(tr.get_write_buffer_size(), 3)
     521          self.assertFalse(self.protocol.pause_writing.called)
     522  
     523          # fill the buffer, must pause writing (6 > 4)
     524          tr.write(b'abc')
     525          self.loop._run_once()
     526          self.assertEqual(tr.get_write_buffer_size(), 6)
     527          self.assertTrue(self.protocol.pause_writing.called)
     528  
     529      def test_dont_pause_writing(self):
     530          tr = self.pause_writing_transport(high=4)
     531  
     532          # write a large chunk which completes immediately,
     533          # it should not pause writing
     534          fut = self.loop.create_future()
     535          fut.set_result(None)
     536          self.loop._proactor.send.return_value = fut
     537          tr.write(b'very large data')
     538          self.loop._run_once()
     539          self.assertEqual(tr.get_write_buffer_size(), 0)
     540          self.assertFalse(self.protocol.pause_writing.called)
     541  
     542  
     543  class ESC[4;38;5;81mProactorDatagramTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     544  
     545      def setUp(self):
     546          super().setUp()
     547          self.loop = self.new_test_loop()
     548          self.proactor = mock.Mock()
     549          self.loop._proactor = self.proactor
     550          self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
     551          self.sock = mock.Mock(spec_set=socket.socket)
     552          self.sock.fileno.return_value = 7
     553  
     554      def datagram_transport(self, address=None):
     555          self.sock.getpeername.side_effect = None if address else OSError
     556          transport = _ProactorDatagramTransport(self.loop, self.sock,
     557                                                 self.protocol,
     558                                                 address=address)
     559          self.addCleanup(close_transport, transport)
     560          return transport
     561  
     562      def test_sendto(self):
     563          data = b'data'
     564          transport = self.datagram_transport()
     565          transport.sendto(data, ('0.0.0.0', 1234))
     566          self.assertTrue(self.proactor.sendto.called)
     567          self.proactor.sendto.assert_called_with(
     568              self.sock, data, addr=('0.0.0.0', 1234))
     569  
     570      def test_sendto_bytearray(self):
     571          data = bytearray(b'data')
     572          transport = self.datagram_transport()
     573          transport.sendto(data, ('0.0.0.0', 1234))
     574          self.assertTrue(self.proactor.sendto.called)
     575          self.proactor.sendto.assert_called_with(
     576              self.sock, b'data', addr=('0.0.0.0', 1234))
     577  
     578      def test_sendto_memoryview(self):
     579          data = memoryview(b'data')
     580          transport = self.datagram_transport()
     581          transport.sendto(data, ('0.0.0.0', 1234))
     582          self.assertTrue(self.proactor.sendto.called)
     583          self.proactor.sendto.assert_called_with(
     584              self.sock, b'data', addr=('0.0.0.0', 1234))
     585  
     586      def test_sendto_no_data(self):
     587          transport = self.datagram_transport()
     588          transport._buffer.append((b'data', ('0.0.0.0', 12345)))
     589          transport.sendto(b'', ())
     590          self.assertFalse(self.sock.sendto.called)
     591          self.assertEqual(
     592              [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
     593  
     594      def test_sendto_buffer(self):
     595          transport = self.datagram_transport()
     596          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
     597          transport._write_fut = object()
     598          transport.sendto(b'data2', ('0.0.0.0', 12345))
     599          self.assertFalse(self.proactor.sendto.called)
     600          self.assertEqual(
     601              [(b'data1', ('0.0.0.0', 12345)),
     602               (b'data2', ('0.0.0.0', 12345))],
     603              list(transport._buffer))
     604  
     605      def test_sendto_buffer_bytearray(self):
     606          data2 = bytearray(b'data2')
     607          transport = self.datagram_transport()
     608          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
     609          transport._write_fut = object()
     610          transport.sendto(data2, ('0.0.0.0', 12345))
     611          self.assertFalse(self.proactor.sendto.called)
     612          self.assertEqual(
     613              [(b'data1', ('0.0.0.0', 12345)),
     614               (b'data2', ('0.0.0.0', 12345))],
     615              list(transport._buffer))
     616          self.assertIsInstance(transport._buffer[1][0], bytes)
     617  
     618      def test_sendto_buffer_memoryview(self):
     619          data2 = memoryview(b'data2')
     620          transport = self.datagram_transport()
     621          transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
     622          transport._write_fut = object()
     623          transport.sendto(data2, ('0.0.0.0', 12345))
     624          self.assertFalse(self.proactor.sendto.called)
     625          self.assertEqual(
     626              [(b'data1', ('0.0.0.0', 12345)),
     627               (b'data2', ('0.0.0.0', 12345))],
     628              list(transport._buffer))
     629          self.assertIsInstance(transport._buffer[1][0], bytes)
     630  
     631      @mock.patch('asyncio.proactor_events.logger')
     632      def test_sendto_exception(self, m_log):
     633          data = b'data'
     634          err = self.proactor.sendto.side_effect = RuntimeError()
     635  
     636          transport = self.datagram_transport()
     637          transport._fatal_error = mock.Mock()
     638          transport.sendto(data, ())
     639  
     640          self.assertTrue(transport._fatal_error.called)
     641          transport._fatal_error.assert_called_with(
     642                                     err,
     643                                     'Fatal write error on datagram transport')
     644          transport._conn_lost = 1
     645  
     646          transport._address = ('123',)
     647          transport.sendto(data)
     648          transport.sendto(data)
     649          transport.sendto(data)
     650          transport.sendto(data)
     651          transport.sendto(data)
     652          m_log.warning.assert_called_with('socket.sendto() raised exception.')
     653  
     654      def test_sendto_error_received(self):
     655          data = b'data'
     656  
     657          self.sock.sendto.side_effect = ConnectionRefusedError
     658  
     659          transport = self.datagram_transport()
     660          transport._fatal_error = mock.Mock()
     661          transport.sendto(data, ())
     662  
     663          self.assertEqual(transport._conn_lost, 0)
     664          self.assertFalse(transport._fatal_error.called)
     665  
     666      def test_sendto_error_received_connected(self):
     667          data = b'data'
     668  
     669          self.proactor.send.side_effect = ConnectionRefusedError
     670  
     671          transport = self.datagram_transport(address=('0.0.0.0', 1))
     672          transport._fatal_error = mock.Mock()
     673          transport.sendto(data)
     674  
     675          self.assertFalse(transport._fatal_error.called)
     676          self.assertTrue(self.protocol.error_received.called)
     677  
     678      def test_sendto_str(self):
     679          transport = self.datagram_transport()
     680          self.assertRaises(TypeError, transport.sendto, 'str', ())
     681  
     682      def test_sendto_connected_addr(self):
     683          transport = self.datagram_transport(address=('0.0.0.0', 1))
     684          self.assertRaises(
     685              ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
     686  
     687      def test_sendto_closing(self):
     688          transport = self.datagram_transport(address=(1,))
     689          transport.close()
     690          self.assertEqual(transport._conn_lost, 1)
     691          transport.sendto(b'data', (1,))
     692          self.assertEqual(transport._conn_lost, 2)
     693  
     694      def test__loop_writing_closing(self):
     695          transport = self.datagram_transport()
     696          transport._closing = True
     697          transport._loop_writing()
     698          self.assertIsNone(transport._write_fut)
     699          test_utils.run_briefly(self.loop)
     700          self.sock.close.assert_called_with()
     701          self.protocol.connection_lost.assert_called_with(None)
     702  
     703      def test__loop_writing_exception(self):
     704          err = self.proactor.sendto.side_effect = RuntimeError()
     705  
     706          transport = self.datagram_transport()
     707          transport._fatal_error = mock.Mock()
     708          transport._buffer.append((b'data', ()))
     709          transport._loop_writing()
     710  
     711          transport._fatal_error.assert_called_with(
     712                                     err,
     713                                     'Fatal write error on datagram transport')
     714  
     715      def test__loop_writing_error_received(self):
     716          self.proactor.sendto.side_effect = ConnectionRefusedError
     717  
     718          transport = self.datagram_transport()
     719          transport._fatal_error = mock.Mock()
     720          transport._buffer.append((b'data', ()))
     721          transport._loop_writing()
     722  
     723          self.assertFalse(transport._fatal_error.called)
     724  
     725      def test__loop_writing_error_received_connection(self):
     726          self.proactor.send.side_effect = ConnectionRefusedError
     727  
     728          transport = self.datagram_transport(address=('0.0.0.0', 1))
     729          transport._fatal_error = mock.Mock()
     730          transport._buffer.append((b'data', ()))
     731          transport._loop_writing()
     732  
     733          self.assertFalse(transport._fatal_error.called)
     734          self.assertTrue(self.protocol.error_received.called)
     735  
     736      @mock.patch('asyncio.base_events.logger.error')
     737      def test_fatal_error_connected(self, m_exc):
     738          transport = self.datagram_transport(address=('0.0.0.0', 1))
     739          err = ConnectionRefusedError()
     740          transport._fatal_error(err)
     741          self.assertFalse(self.protocol.error_received.called)
     742          m_exc.assert_not_called()
     743  
     744  
     745  class ESC[4;38;5;81mBaseProactorEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     746  
     747      def setUp(self):
     748          super().setUp()
     749  
     750          self.sock = test_utils.mock_nonblocking_socket()
     751          self.proactor = mock.Mock()
     752  
     753          self.ssock, self.csock = mock.Mock(), mock.Mock()
     754  
     755          with mock.patch('asyncio.proactor_events.socket.socketpair',
     756                          return_value=(self.ssock, self.csock)):
     757              with mock.patch('signal.set_wakeup_fd'):
     758                  self.loop = BaseProactorEventLoop(self.proactor)
     759          self.set_event_loop(self.loop)
     760  
     761      @mock.patch('asyncio.proactor_events.socket.socketpair')
     762      def test_ctor(self, socketpair):
     763          ssock, csock = socketpair.return_value = (
     764              mock.Mock(), mock.Mock())
     765          with mock.patch('signal.set_wakeup_fd'):
     766              loop = BaseProactorEventLoop(self.proactor)
     767          self.assertIs(loop._ssock, ssock)
     768          self.assertIs(loop._csock, csock)
     769          self.assertEqual(loop._internal_fds, 1)
     770          loop.close()
     771  
     772      def test_close_self_pipe(self):
     773          self.loop._close_self_pipe()
     774          self.assertEqual(self.loop._internal_fds, 0)
     775          self.assertTrue(self.ssock.close.called)
     776          self.assertTrue(self.csock.close.called)
     777          self.assertIsNone(self.loop._ssock)
     778          self.assertIsNone(self.loop._csock)
     779  
     780          # Don't call close(): _close_self_pipe() cannot be called twice
     781          self.loop._closed = True
     782  
     783      def test_close(self):
     784          self.loop._close_self_pipe = mock.Mock()
     785          self.loop.close()
     786          self.assertTrue(self.loop._close_self_pipe.called)
     787          self.assertTrue(self.proactor.close.called)
     788          self.assertIsNone(self.loop._proactor)
     789  
     790          self.loop._close_self_pipe.reset_mock()
     791          self.loop.close()
     792          self.assertFalse(self.loop._close_self_pipe.called)
     793  
     794      def test_make_socket_transport(self):
     795          tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
     796          self.assertIsInstance(tr, _ProactorSocketTransport)
     797          close_transport(tr)
     798  
     799      def test_loop_self_reading(self):
     800          self.loop._loop_self_reading()
     801          self.proactor.recv.assert_called_with(self.ssock, 4096)
     802          self.proactor.recv.return_value.add_done_callback.assert_called_with(
     803              self.loop._loop_self_reading)
     804  
     805      def test_loop_self_reading_fut(self):
     806          fut = mock.Mock()
     807          self.loop._self_reading_future = fut
     808          self.loop._loop_self_reading(fut)
     809          self.assertTrue(fut.result.called)
     810          self.proactor.recv.assert_called_with(self.ssock, 4096)
     811          self.proactor.recv.return_value.add_done_callback.assert_called_with(
     812              self.loop._loop_self_reading)
     813  
     814      def test_loop_self_reading_exception(self):
     815          self.loop.call_exception_handler = mock.Mock()
     816          self.proactor.recv.side_effect = OSError()
     817          self.loop._loop_self_reading()
     818          self.assertTrue(self.loop.call_exception_handler.called)
     819  
     820      def test_write_to_self(self):
     821          self.loop._write_to_self()
     822          self.csock.send.assert_called_with(b'\0')
     823  
     824      def test_process_events(self):
     825          self.loop._process_events([])
     826  
     827      @mock.patch('asyncio.base_events.logger')
     828      def test_create_server(self, m_log):
     829          pf = mock.Mock()
     830          call_soon = self.loop.call_soon = mock.Mock()
     831  
     832          self.loop._start_serving(pf, self.sock)
     833          self.assertTrue(call_soon.called)
     834  
     835          # callback
     836          loop = call_soon.call_args[0][0]
     837          loop()
     838          self.proactor.accept.assert_called_with(self.sock)
     839  
     840          # conn
     841          fut = mock.Mock()
     842          fut.result.return_value = (mock.Mock(), mock.Mock())
     843  
     844          make_tr = self.loop._make_socket_transport = mock.Mock()
     845          loop(fut)
     846          self.assertTrue(fut.result.called)
     847          self.assertTrue(make_tr.called)
     848  
     849          # exception
     850          fut.result.side_effect = OSError()
     851          loop(fut)
     852          self.assertTrue(self.sock.close.called)
     853          self.assertTrue(m_log.error.called)
     854  
     855      def test_create_server_cancel(self):
     856          pf = mock.Mock()
     857          call_soon = self.loop.call_soon = mock.Mock()
     858  
     859          self.loop._start_serving(pf, self.sock)
     860          loop = call_soon.call_args[0][0]
     861  
     862          # cancelled
     863          fut = self.loop.create_future()
     864          fut.cancel()
     865          loop(fut)
     866          self.assertTrue(self.sock.close.called)
     867  
     868      def test_stop_serving(self):
     869          sock1 = mock.Mock()
     870          future1 = mock.Mock()
     871          sock2 = mock.Mock()
     872          future2 = mock.Mock()
     873          self.loop._accept_futures = {
     874              sock1.fileno(): future1,
     875              sock2.fileno(): future2
     876          }
     877  
     878          self.loop._stop_serving(sock1)
     879          self.assertTrue(sock1.close.called)
     880          self.assertTrue(future1.cancel.called)
     881          self.proactor._stop_serving.assert_called_with(sock1)
     882          self.assertFalse(sock2.close.called)
     883          self.assertFalse(future2.cancel.called)
     884  
     885      def datagram_transport(self):
     886          self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
     887          return self.loop._make_datagram_transport(self.sock, self.protocol)
     888  
     889      def test_make_datagram_transport(self):
     890          tr = self.datagram_transport()
     891          self.assertIsInstance(tr, _ProactorDatagramTransport)
     892          self.assertIsInstance(tr, asyncio.DatagramTransport)
     893          close_transport(tr)
     894  
     895      def test_datagram_loop_writing(self):
     896          tr = self.datagram_transport()
     897          tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
     898          tr._loop_writing()
     899          self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
     900          self.loop._proactor.sendto.return_value.add_done_callback.\
     901              assert_called_with(tr._loop_writing)
     902  
     903          close_transport(tr)
     904  
     905      def test_datagram_loop_reading(self):
     906          tr = self.datagram_transport()
     907          tr._loop_reading()
     908          self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
     909          self.assertFalse(self.protocol.datagram_received.called)
     910          self.assertFalse(self.protocol.error_received.called)
     911          close_transport(tr)
     912  
     913      def test_datagram_loop_reading_data(self):
     914          res = self.loop.create_future()
     915          res.set_result((b'data', ('127.0.0.1', 12068)))
     916  
     917          tr = self.datagram_transport()
     918          tr._read_fut = res
     919          tr._loop_reading(res)
     920          self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
     921          self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
     922          close_transport(tr)
     923  
     924      @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
     925      def test_datagram_loop_reading_no_data(self):
     926          res = self.loop.create_future()
     927          res.set_result((b'', ('127.0.0.1', 12068)))
     928  
     929          tr = self.datagram_transport()
     930          self.assertRaises(AssertionError, tr._loop_reading, res)
     931  
     932          tr.close = mock.Mock()
     933          tr._read_fut = res
     934          tr._loop_reading(res)
     935          self.assertTrue(self.loop._proactor.recvfrom.called)
     936          self.assertFalse(self.protocol.error_received.called)
     937          self.assertFalse(tr.close.called)
     938          close_transport(tr)
     939  
     940      def test_datagram_loop_reading_aborted(self):
     941          err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
     942  
     943          tr = self.datagram_transport()
     944          tr._fatal_error = mock.Mock()
     945          tr._protocol.error_received = mock.Mock()
     946          tr._loop_reading()
     947          tr._protocol.error_received.assert_called_with(err)
     948          close_transport(tr)
     949  
     950      def test_datagram_loop_writing_aborted(self):
     951          err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
     952  
     953          tr = self.datagram_transport()
     954          tr._fatal_error = mock.Mock()
     955          tr._protocol.error_received = mock.Mock()
     956          tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
     957          tr._loop_writing()
     958          tr._protocol.error_received.assert_called_with(err)
     959          close_transport(tr)
     960  
     961  
     962  @unittest.skipIf(sys.platform != 'win32',
     963                   'Proactor is supported on Windows only')
     964  class ESC[4;38;5;81mProactorEventLoopUnixSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     965      DATA = b"12345abcde" * 16 * 1024  # 160 KiB
     966  
     967      class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     968  
     969          def __init__(self, loop):
     970              self.started = False
     971              self.closed = False
     972              self.data = bytearray()
     973              self.fut = loop.create_future()
     974              self.transport = None
     975  
     976          def connection_made(self, transport):
     977              self.started = True
     978              self.transport = transport
     979  
     980          def data_received(self, data):
     981              self.data.extend(data)
     982  
     983          def connection_lost(self, exc):
     984              self.closed = True
     985              self.fut.set_result(None)
     986  
     987          async def wait_closed(self):
     988              await self.fut
     989  
     990      @classmethod
     991      def setUpClass(cls):
     992          with open(os_helper.TESTFN, 'wb') as fp:
     993              fp.write(cls.DATA)
     994          super().setUpClass()
     995  
     996      @classmethod
     997      def tearDownClass(cls):
     998          os_helper.unlink(os_helper.TESTFN)
     999          super().tearDownClass()
    1000  
    1001      def setUp(self):
    1002          self.loop = asyncio.ProactorEventLoop()
    1003          self.set_event_loop(self.loop)
    1004          self.addCleanup(self.loop.close)
    1005          self.file = open(os_helper.TESTFN, 'rb')
    1006          self.addCleanup(self.file.close)
    1007          super().setUp()
    1008  
    1009      def make_socket(self, cleanup=True):
    1010          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1011          sock.setblocking(False)
    1012          sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
    1013          sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
    1014          if cleanup:
    1015              self.addCleanup(sock.close)
    1016          return sock
    1017  
    1018      def run_loop(self, coro):
    1019          return self.loop.run_until_complete(coro)
    1020  
    1021      def prepare(self):
    1022          sock = self.make_socket()
    1023          proto = self.MyProto(self.loop)
    1024          port = socket_helper.find_unused_port()
    1025          srv_sock = self.make_socket(cleanup=False)
    1026          srv_sock.bind(('127.0.0.1', port))
    1027          server = self.run_loop(self.loop.create_server(
    1028              lambda: proto, sock=srv_sock))
    1029          self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
    1030  
    1031          def cleanup():
    1032              if proto.transport is not None:
    1033                  # can be None if the task was cancelled before
    1034                  # connection_made callback
    1035                  proto.transport.close()
    1036                  self.run_loop(proto.wait_closed())
    1037  
    1038              server.close()
    1039              self.run_loop(server.wait_closed())
    1040  
    1041          self.addCleanup(cleanup)
    1042  
    1043          return sock, proto
    1044  
    1045      def test_sock_sendfile_not_a_file(self):
    1046          sock, proto = self.prepare()
    1047          f = object()
    1048          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
    1049                                      "not a regular file"):
    1050              self.run_loop(self.loop._sock_sendfile_native(sock, f,
    1051                                                            0, None))
    1052          self.assertEqual(self.file.tell(), 0)
    1053  
    1054      def test_sock_sendfile_iobuffer(self):
    1055          sock, proto = self.prepare()
    1056          f = io.BytesIO()
    1057          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
    1058                                      "not a regular file"):
    1059              self.run_loop(self.loop._sock_sendfile_native(sock, f,
    1060                                                            0, None))
    1061          self.assertEqual(self.file.tell(), 0)
    1062  
    1063      def test_sock_sendfile_not_regular_file(self):
    1064          sock, proto = self.prepare()
    1065          f = mock.Mock()
    1066          f.fileno.return_value = -1
    1067          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
    1068                                      "not a regular file"):
    1069              self.run_loop(self.loop._sock_sendfile_native(sock, f,
    1070                                                            0, None))
    1071          self.assertEqual(self.file.tell(), 0)
    1072  
    1073  
    1074  if __name__ == '__main__':
    1075      unittest.main()