1 """Tests for transports.py."""
2
3 import unittest
4 from unittest import mock
5
6 import asyncio
7 from asyncio import transports
8
9
10 def tearDownModule():
11 # not needed for the test file but added for uniformness with all other
12 # asyncio test files for the sake of unified cleanup
13 asyncio.set_event_loop_policy(None)
14
15
16 class ESC[4;38;5;81mTransportTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
17
18 def test_ctor_extra_is_none(self):
19 transport = asyncio.Transport()
20 self.assertEqual(transport._extra, {})
21
22 def test_get_extra_info(self):
23 transport = asyncio.Transport({'extra': 'info'})
24 self.assertEqual('info', transport.get_extra_info('extra'))
25 self.assertIsNone(transport.get_extra_info('unknown'))
26
27 default = object()
28 self.assertIs(default, transport.get_extra_info('unknown', default))
29
30 def test_writelines(self):
31 writer = mock.Mock()
32
33 class ESC[4;38;5;81mMyTransport(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTransport):
34 def write(self, data):
35 writer(data)
36
37 transport = MyTransport()
38
39 transport.writelines([b'line1',
40 bytearray(b'line2'),
41 memoryview(b'line3')])
42 self.assertEqual(1, writer.call_count)
43 writer.assert_called_with(b'line1line2line3')
44
45 def test_not_implemented(self):
46 transport = asyncio.Transport()
47
48 self.assertRaises(NotImplementedError,
49 transport.set_write_buffer_limits)
50 self.assertRaises(NotImplementedError, transport.get_write_buffer_size)
51 self.assertRaises(NotImplementedError, transport.write, 'data')
52 self.assertRaises(NotImplementedError, transport.write_eof)
53 self.assertRaises(NotImplementedError, transport.can_write_eof)
54 self.assertRaises(NotImplementedError, transport.pause_reading)
55 self.assertRaises(NotImplementedError, transport.resume_reading)
56 self.assertRaises(NotImplementedError, transport.is_reading)
57 self.assertRaises(NotImplementedError, transport.close)
58 self.assertRaises(NotImplementedError, transport.abort)
59
60 def test_dgram_not_implemented(self):
61 transport = asyncio.DatagramTransport()
62
63 self.assertRaises(NotImplementedError, transport.sendto, 'data')
64 self.assertRaises(NotImplementedError, transport.abort)
65
66 def test_subprocess_transport_not_implemented(self):
67 transport = asyncio.SubprocessTransport()
68
69 self.assertRaises(NotImplementedError, transport.get_pid)
70 self.assertRaises(NotImplementedError, transport.get_returncode)
71 self.assertRaises(NotImplementedError, transport.get_pipe_transport, 1)
72 self.assertRaises(NotImplementedError, transport.send_signal, 1)
73 self.assertRaises(NotImplementedError, transport.terminate)
74 self.assertRaises(NotImplementedError, transport.kill)
75
76 def test_flowcontrol_mixin_set_write_limits(self):
77
78 class ESC[4;38;5;81mMyTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
79 ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mTransport):
80
81 def get_write_buffer_size(self):
82 return 512
83
84 loop = mock.Mock()
85 transport = MyTransport(loop=loop)
86 transport._protocol = mock.Mock()
87
88 self.assertFalse(transport._protocol_paused)
89
90 with self.assertRaisesRegex(ValueError, 'high.*must be >= low'):
91 transport.set_write_buffer_limits(high=0, low=1)
92
93 transport.set_write_buffer_limits(high=1024, low=128)
94 self.assertFalse(transport._protocol_paused)
95 self.assertEqual(transport.get_write_buffer_limits(), (128, 1024))
96
97 transport.set_write_buffer_limits(high=256, low=128)
98 self.assertTrue(transport._protocol_paused)
99 self.assertEqual(transport.get_write_buffer_limits(), (128, 256))
100
101
102 if __name__ == '__main__':
103 unittest.main()