(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_transports.py
       1  """Tests for transports.py."""
       2  
       3  import unittest
       4  from unittest import mock
       5  
       6  import asyncio
       7  from asyncio import transports
       8  
       9  
      10  def tearDownModule():
      11      # not needed for the test file but added for uniformness with all other
      12      # asyncio test files for the sake of unified cleanup
      13      asyncio.set_event_loop_policy(None)
      14  
      15  
      16  class ESC[4;38;5;81mTransportTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      17  
      18      def test_ctor_extra_is_none(self):
      19          transport = asyncio.Transport()
      20          self.assertEqual(transport._extra, {})
      21  
      22      def test_get_extra_info(self):
      23          transport = asyncio.Transport({'extra': 'info'})
      24          self.assertEqual('info', transport.get_extra_info('extra'))
      25          self.assertIsNone(transport.get_extra_info('unknown'))
      26  
      27          default = object()
      28          self.assertIs(default, transport.get_extra_info('unknown', default))
      29  
      30      def test_writelines(self):
      31          writer = mock.Mock()
      32  
      33          class ESC[4;38;5;81mMyTransport(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTransport):
      34              def write(self, data):
      35                  writer(data)
      36  
      37          transport = MyTransport()
      38  
      39          transport.writelines([b'line1',
      40                                bytearray(b'line2'),
      41                                memoryview(b'line3')])
      42          self.assertEqual(1, writer.call_count)
      43          writer.assert_called_with(b'line1line2line3')
      44  
      45      def test_not_implemented(self):
      46          transport = asyncio.Transport()
      47  
      48          self.assertRaises(NotImplementedError,
      49                            transport.set_write_buffer_limits)
      50          self.assertRaises(NotImplementedError, transport.get_write_buffer_size)
      51          self.assertRaises(NotImplementedError, transport.write, 'data')
      52          self.assertRaises(NotImplementedError, transport.write_eof)
      53          self.assertRaises(NotImplementedError, transport.can_write_eof)
      54          self.assertRaises(NotImplementedError, transport.pause_reading)
      55          self.assertRaises(NotImplementedError, transport.resume_reading)
      56          self.assertRaises(NotImplementedError, transport.is_reading)
      57          self.assertRaises(NotImplementedError, transport.close)
      58          self.assertRaises(NotImplementedError, transport.abort)
      59  
      60      def test_dgram_not_implemented(self):
      61          transport = asyncio.DatagramTransport()
      62  
      63          self.assertRaises(NotImplementedError, transport.sendto, 'data')
      64          self.assertRaises(NotImplementedError, transport.abort)
      65  
      66      def test_subprocess_transport_not_implemented(self):
      67          transport = asyncio.SubprocessTransport()
      68  
      69          self.assertRaises(NotImplementedError, transport.get_pid)
      70          self.assertRaises(NotImplementedError, transport.get_returncode)
      71          self.assertRaises(NotImplementedError, transport.get_pipe_transport, 1)
      72          self.assertRaises(NotImplementedError, transport.send_signal, 1)
      73          self.assertRaises(NotImplementedError, transport.terminate)
      74          self.assertRaises(NotImplementedError, transport.kill)
      75  
      76      def test_flowcontrol_mixin_set_write_limits(self):
      77  
      78          class ESC[4;38;5;81mMyTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
      79                            ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
      80  
      81              def get_write_buffer_size(self):
      82                  return 512
      83  
      84          loop = mock.Mock()
      85          transport = MyTransport(loop=loop)
      86          transport._protocol = mock.Mock()
      87  
      88          self.assertFalse(transport._protocol_paused)
      89  
      90          with self.assertRaisesRegex(ValueError, 'high.*must be >= low'):
      91              transport.set_write_buffer_limits(high=0, low=1)
      92  
      93          transport.set_write_buffer_limits(high=1024, low=128)
      94          self.assertFalse(transport._protocol_paused)
      95          self.assertEqual(transport.get_write_buffer_limits(), (128, 1024))
      96  
      97          transport.set_write_buffer_limits(high=256, low=128)
      98          self.assertTrue(transport._protocol_paused)
      99          self.assertEqual(transport.get_write_buffer_limits(), (128, 256))
     100  
     101  
     102  if __name__ == '__main__':
     103      unittest.main()