(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_streams.py
       1  """Tests for streams.py."""
       2  
       3  import gc
       4  import os
       5  import queue
       6  import pickle
       7  import socket
       8  import sys
       9  import threading
      10  import unittest
      11  from unittest import mock
      12  from test.support import socket_helper
      13  try:
      14      import ssl
      15  except ImportError:
      16      ssl = None
      17  
      18  import asyncio
      19  from test.test_asyncio import utils as test_utils
      20  
      21  
      22  def tearDownModule():
      23      asyncio.set_event_loop_policy(None)
      24  
      25  
      26  class ESC[4;38;5;81mStreamTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      27  
      28      DATA = b'line1\nline2\nline3\n'
      29  
      30      def setUp(self):
      31          super().setUp()
      32          self.loop = asyncio.new_event_loop()
      33          self.set_event_loop(self.loop)
      34  
      35      def tearDown(self):
      36          # just in case if we have transport close callbacks
      37          test_utils.run_briefly(self.loop)
      38  
      39          # set_event_loop() takes care of closing self.loop in a safe way
      40          super().tearDown()
      41  
      42      def _basetest_open_connection(self, open_connection_fut):
      43          messages = []
      44          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
      45          reader, writer = self.loop.run_until_complete(open_connection_fut)
      46          writer.write(b'GET / HTTP/1.0\r\n\r\n')
      47          f = reader.readline()
      48          data = self.loop.run_until_complete(f)
      49          self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
      50          f = reader.read()
      51          data = self.loop.run_until_complete(f)
      52          self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
      53          writer.close()
      54          self.assertEqual(messages, [])
      55  
      56      def test_open_connection(self):
      57          with test_utils.run_test_server() as httpd:
      58              conn_fut = asyncio.open_connection(*httpd.address)
      59              self._basetest_open_connection(conn_fut)
      60  
      61      @socket_helper.skip_unless_bind_unix_socket
      62      def test_open_unix_connection(self):
      63          with test_utils.run_test_unix_server() as httpd:
      64              conn_fut = asyncio.open_unix_connection(httpd.address)
      65              self._basetest_open_connection(conn_fut)
      66  
      67      def _basetest_open_connection_no_loop_ssl(self, open_connection_fut):
      68          messages = []
      69          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
      70          try:
      71              reader, writer = self.loop.run_until_complete(open_connection_fut)
      72          finally:
      73              asyncio.set_event_loop(None)
      74          writer.write(b'GET / HTTP/1.0\r\n\r\n')
      75          f = reader.read()
      76          data = self.loop.run_until_complete(f)
      77          self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
      78  
      79          writer.close()
      80          self.assertEqual(messages, [])
      81  
      82      @unittest.skipIf(ssl is None, 'No ssl module')
      83      def test_open_connection_no_loop_ssl(self):
      84          with test_utils.run_test_server(use_ssl=True) as httpd:
      85              conn_fut = asyncio.open_connection(
      86                  *httpd.address,
      87                  ssl=test_utils.dummy_ssl_context())
      88  
      89              self._basetest_open_connection_no_loop_ssl(conn_fut)
      90  
      91      @socket_helper.skip_unless_bind_unix_socket
      92      @unittest.skipIf(ssl is None, 'No ssl module')
      93      def test_open_unix_connection_no_loop_ssl(self):
      94          with test_utils.run_test_unix_server(use_ssl=True) as httpd:
      95              conn_fut = asyncio.open_unix_connection(
      96                  httpd.address,
      97                  ssl=test_utils.dummy_ssl_context(),
      98                  server_hostname='',
      99              )
     100  
     101              self._basetest_open_connection_no_loop_ssl(conn_fut)
     102  
     103      def _basetest_open_connection_error(self, open_connection_fut):
     104          messages = []
     105          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
     106          reader, writer = self.loop.run_until_complete(open_connection_fut)
     107          writer._protocol.connection_lost(ZeroDivisionError())
     108          f = reader.read()
     109          with self.assertRaises(ZeroDivisionError):
     110              self.loop.run_until_complete(f)
     111          writer.close()
     112          test_utils.run_briefly(self.loop)
     113          self.assertEqual(messages, [])
     114  
     115      def test_open_connection_error(self):
     116          with test_utils.run_test_server() as httpd:
     117              conn_fut = asyncio.open_connection(*httpd.address)
     118              self._basetest_open_connection_error(conn_fut)
     119  
     120      @socket_helper.skip_unless_bind_unix_socket
     121      def test_open_unix_connection_error(self):
     122          with test_utils.run_test_unix_server() as httpd:
     123              conn_fut = asyncio.open_unix_connection(httpd.address)
     124              self._basetest_open_connection_error(conn_fut)
     125  
     126      def test_feed_empty_data(self):
     127          stream = asyncio.StreamReader(loop=self.loop)
     128  
     129          stream.feed_data(b'')
     130          self.assertEqual(b'', stream._buffer)
     131  
     132      def test_feed_nonempty_data(self):
     133          stream = asyncio.StreamReader(loop=self.loop)
     134  
     135          stream.feed_data(self.DATA)
     136          self.assertEqual(self.DATA, stream._buffer)
     137  
     138      def test_read_zero(self):
     139          # Read zero bytes.
     140          stream = asyncio.StreamReader(loop=self.loop)
     141          stream.feed_data(self.DATA)
     142  
     143          data = self.loop.run_until_complete(stream.read(0))
     144          self.assertEqual(b'', data)
     145          self.assertEqual(self.DATA, stream._buffer)
     146  
     147      def test_read(self):
     148          # Read bytes.
     149          stream = asyncio.StreamReader(loop=self.loop)
     150          read_task = self.loop.create_task(stream.read(30))
     151  
     152          def cb():
     153              stream.feed_data(self.DATA)
     154          self.loop.call_soon(cb)
     155  
     156          data = self.loop.run_until_complete(read_task)
     157          self.assertEqual(self.DATA, data)
     158          self.assertEqual(b'', stream._buffer)
     159  
     160      def test_read_line_breaks(self):
     161          # Read bytes without line breaks.
     162          stream = asyncio.StreamReader(loop=self.loop)
     163          stream.feed_data(b'line1')
     164          stream.feed_data(b'line2')
     165  
     166          data = self.loop.run_until_complete(stream.read(5))
     167  
     168          self.assertEqual(b'line1', data)
     169          self.assertEqual(b'line2', stream._buffer)
     170  
     171      def test_read_eof(self):
     172          # Read bytes, stop at eof.
     173          stream = asyncio.StreamReader(loop=self.loop)
     174          read_task = self.loop.create_task(stream.read(1024))
     175  
     176          def cb():
     177              stream.feed_eof()
     178          self.loop.call_soon(cb)
     179  
     180          data = self.loop.run_until_complete(read_task)
     181          self.assertEqual(b'', data)
     182          self.assertEqual(b'', stream._buffer)
     183  
     184      def test_read_until_eof(self):
     185          # Read all bytes until eof.
     186          stream = asyncio.StreamReader(loop=self.loop)
     187          read_task = self.loop.create_task(stream.read(-1))
     188  
     189          def cb():
     190              stream.feed_data(b'chunk1\n')
     191              stream.feed_data(b'chunk2')
     192              stream.feed_eof()
     193          self.loop.call_soon(cb)
     194  
     195          data = self.loop.run_until_complete(read_task)
     196  
     197          self.assertEqual(b'chunk1\nchunk2', data)
     198          self.assertEqual(b'', stream._buffer)
     199  
     200      def test_read_exception(self):
     201          stream = asyncio.StreamReader(loop=self.loop)
     202          stream.feed_data(b'line\n')
     203  
     204          data = self.loop.run_until_complete(stream.read(2))
     205          self.assertEqual(b'li', data)
     206  
     207          stream.set_exception(ValueError())
     208          self.assertRaises(
     209              ValueError, self.loop.run_until_complete, stream.read(2))
     210  
     211      def test_invalid_limit(self):
     212          with self.assertRaisesRegex(ValueError, 'imit'):
     213              asyncio.StreamReader(limit=0, loop=self.loop)
     214  
     215          with self.assertRaisesRegex(ValueError, 'imit'):
     216              asyncio.StreamReader(limit=-1, loop=self.loop)
     217  
     218      def test_read_limit(self):
     219          stream = asyncio.StreamReader(limit=3, loop=self.loop)
     220          stream.feed_data(b'chunk')
     221          data = self.loop.run_until_complete(stream.read(5))
     222          self.assertEqual(b'chunk', data)
     223          self.assertEqual(b'', stream._buffer)
     224  
     225      def test_readline(self):
     226          # Read one line. 'readline' will need to wait for the data
     227          # to come from 'cb'
     228          stream = asyncio.StreamReader(loop=self.loop)
     229          stream.feed_data(b'chunk1 ')
     230          read_task = self.loop.create_task(stream.readline())
     231  
     232          def cb():
     233              stream.feed_data(b'chunk2 ')
     234              stream.feed_data(b'chunk3 ')
     235              stream.feed_data(b'\n chunk4')
     236          self.loop.call_soon(cb)
     237  
     238          line = self.loop.run_until_complete(read_task)
     239          self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
     240          self.assertEqual(b' chunk4', stream._buffer)
     241  
     242      def test_readline_limit_with_existing_data(self):
     243          # Read one line. The data is in StreamReader's buffer
     244          # before the event loop is run.
     245  
     246          stream = asyncio.StreamReader(limit=3, loop=self.loop)
     247          stream.feed_data(b'li')
     248          stream.feed_data(b'ne1\nline2\n')
     249  
     250          self.assertRaises(
     251              ValueError, self.loop.run_until_complete, stream.readline())
     252          # The buffer should contain the remaining data after exception
     253          self.assertEqual(b'line2\n', stream._buffer)
     254  
     255          stream = asyncio.StreamReader(limit=3, loop=self.loop)
     256          stream.feed_data(b'li')
     257          stream.feed_data(b'ne1')
     258          stream.feed_data(b'li')
     259  
     260          self.assertRaises(
     261              ValueError, self.loop.run_until_complete, stream.readline())
     262          # No b'\n' at the end. The 'limit' is set to 3. So before
     263          # waiting for the new data in buffer, 'readline' will consume
     264          # the entire buffer, and since the length of the consumed data
     265          # is more than 3, it will raise a ValueError. The buffer is
     266          # expected to be empty now.
     267          self.assertEqual(b'', stream._buffer)
     268  
     269      def test_at_eof(self):
     270          stream = asyncio.StreamReader(loop=self.loop)
     271          self.assertFalse(stream.at_eof())
     272  
     273          stream.feed_data(b'some data\n')
     274          self.assertFalse(stream.at_eof())
     275  
     276          self.loop.run_until_complete(stream.readline())
     277          self.assertFalse(stream.at_eof())
     278  
     279          stream.feed_data(b'some data\n')
     280          stream.feed_eof()
     281          self.loop.run_until_complete(stream.readline())
     282          self.assertTrue(stream.at_eof())
     283  
     284      def test_readline_limit(self):
     285          # Read one line. StreamReaders are fed with data after
     286          # their 'readline' methods are called.
     287  
     288          stream = asyncio.StreamReader(limit=7, loop=self.loop)
     289          def cb():
     290              stream.feed_data(b'chunk1')
     291              stream.feed_data(b'chunk2')
     292              stream.feed_data(b'chunk3\n')
     293              stream.feed_eof()
     294          self.loop.call_soon(cb)
     295  
     296          self.assertRaises(
     297              ValueError, self.loop.run_until_complete, stream.readline())
     298          # The buffer had just one line of data, and after raising
     299          # a ValueError it should be empty.
     300          self.assertEqual(b'', stream._buffer)
     301  
     302          stream = asyncio.StreamReader(limit=7, loop=self.loop)
     303          def cb():
     304              stream.feed_data(b'chunk1')
     305              stream.feed_data(b'chunk2\n')
     306              stream.feed_data(b'chunk3\n')
     307              stream.feed_eof()
     308          self.loop.call_soon(cb)
     309  
     310          self.assertRaises(
     311              ValueError, self.loop.run_until_complete, stream.readline())
     312          self.assertEqual(b'chunk3\n', stream._buffer)
     313  
     314          # check strictness of the limit
     315          stream = asyncio.StreamReader(limit=7, loop=self.loop)
     316          stream.feed_data(b'1234567\n')
     317          line = self.loop.run_until_complete(stream.readline())
     318          self.assertEqual(b'1234567\n', line)
     319          self.assertEqual(b'', stream._buffer)
     320  
     321          stream.feed_data(b'12345678\n')
     322          with self.assertRaises(ValueError) as cm:
     323              self.loop.run_until_complete(stream.readline())
     324          self.assertEqual(b'', stream._buffer)
     325  
     326          stream.feed_data(b'12345678')
     327          with self.assertRaises(ValueError) as cm:
     328              self.loop.run_until_complete(stream.readline())
     329          self.assertEqual(b'', stream._buffer)
     330  
     331      def test_readline_nolimit_nowait(self):
     332          # All needed data for the first 'readline' call will be
     333          # in the buffer.
     334          stream = asyncio.StreamReader(loop=self.loop)
     335          stream.feed_data(self.DATA[:6])
     336          stream.feed_data(self.DATA[6:])
     337  
     338          line = self.loop.run_until_complete(stream.readline())
     339  
     340          self.assertEqual(b'line1\n', line)
     341          self.assertEqual(b'line2\nline3\n', stream._buffer)
     342  
     343      def test_readline_eof(self):
     344          stream = asyncio.StreamReader(loop=self.loop)
     345          stream.feed_data(b'some data')
     346          stream.feed_eof()
     347  
     348          line = self.loop.run_until_complete(stream.readline())
     349          self.assertEqual(b'some data', line)
     350  
     351      def test_readline_empty_eof(self):
     352          stream = asyncio.StreamReader(loop=self.loop)
     353          stream.feed_eof()
     354  
     355          line = self.loop.run_until_complete(stream.readline())
     356          self.assertEqual(b'', line)
     357  
     358      def test_readline_read_byte_count(self):
     359          stream = asyncio.StreamReader(loop=self.loop)
     360          stream.feed_data(self.DATA)
     361  
     362          self.loop.run_until_complete(stream.readline())
     363  
     364          data = self.loop.run_until_complete(stream.read(7))
     365  
     366          self.assertEqual(b'line2\nl', data)
     367          self.assertEqual(b'ine3\n', stream._buffer)
     368  
     369      def test_readline_exception(self):
     370          stream = asyncio.StreamReader(loop=self.loop)
     371          stream.feed_data(b'line\n')
     372  
     373          data = self.loop.run_until_complete(stream.readline())
     374          self.assertEqual(b'line\n', data)
     375  
     376          stream.set_exception(ValueError())
     377          self.assertRaises(
     378              ValueError, self.loop.run_until_complete, stream.readline())
     379          self.assertEqual(b'', stream._buffer)
     380  
     381      def test_readuntil_separator(self):
     382          stream = asyncio.StreamReader(loop=self.loop)
     383          with self.assertRaisesRegex(ValueError, 'Separator should be'):
     384              self.loop.run_until_complete(stream.readuntil(separator=b''))
     385  
     386      def test_readuntil_multi_chunks(self):
     387          stream = asyncio.StreamReader(loop=self.loop)
     388  
     389          stream.feed_data(b'lineAAA')
     390          data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))
     391          self.assertEqual(b'lineAAA', data)
     392          self.assertEqual(b'', stream._buffer)
     393  
     394          stream.feed_data(b'lineAAA')
     395          data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
     396          self.assertEqual(b'lineAAA', data)
     397          self.assertEqual(b'', stream._buffer)
     398  
     399          stream.feed_data(b'lineAAAxxx')
     400          data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
     401          self.assertEqual(b'lineAAA', data)
     402          self.assertEqual(b'xxx', stream._buffer)
     403  
     404      def test_readuntil_multi_chunks_1(self):
     405          stream = asyncio.StreamReader(loop=self.loop)
     406  
     407          stream.feed_data(b'QWEaa')
     408          stream.feed_data(b'XYaa')
     409          stream.feed_data(b'a')
     410          data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
     411          self.assertEqual(b'QWEaaXYaaa', data)
     412          self.assertEqual(b'', stream._buffer)
     413  
     414          stream.feed_data(b'QWEaa')
     415          stream.feed_data(b'XYa')
     416          stream.feed_data(b'aa')
     417          data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
     418          self.assertEqual(b'QWEaaXYaaa', data)
     419          self.assertEqual(b'', stream._buffer)
     420  
     421          stream.feed_data(b'aaa')
     422          data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
     423          self.assertEqual(b'aaa', data)
     424          self.assertEqual(b'', stream._buffer)
     425  
     426          stream.feed_data(b'Xaaa')
     427          data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
     428          self.assertEqual(b'Xaaa', data)
     429          self.assertEqual(b'', stream._buffer)
     430  
     431          stream.feed_data(b'XXX')
     432          stream.feed_data(b'a')
     433          stream.feed_data(b'a')
     434          stream.feed_data(b'a')
     435          data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
     436          self.assertEqual(b'XXXaaa', data)
     437          self.assertEqual(b'', stream._buffer)
     438  
     439      def test_readuntil_eof(self):
     440          stream = asyncio.StreamReader(loop=self.loop)
     441          data = b'some dataAA'
     442          stream.feed_data(data)
     443          stream.feed_eof()
     444  
     445          with self.assertRaisesRegex(asyncio.IncompleteReadError,
     446                                      'undefined expected bytes') as cm:
     447              self.loop.run_until_complete(stream.readuntil(b'AAA'))
     448          self.assertEqual(cm.exception.partial, data)
     449          self.assertIsNone(cm.exception.expected)
     450          self.assertEqual(b'', stream._buffer)
     451  
     452      def test_readuntil_limit_found_sep(self):
     453          stream = asyncio.StreamReader(loop=self.loop, limit=3)
     454          stream.feed_data(b'some dataAA')
     455          with self.assertRaisesRegex(asyncio.LimitOverrunError,
     456                                      'not found') as cm:
     457              self.loop.run_until_complete(stream.readuntil(b'AAA'))
     458  
     459          self.assertEqual(b'some dataAA', stream._buffer)
     460  
     461          stream.feed_data(b'A')
     462          with self.assertRaisesRegex(asyncio.LimitOverrunError,
     463                                      'is found') as cm:
     464              self.loop.run_until_complete(stream.readuntil(b'AAA'))
     465  
     466          self.assertEqual(b'some dataAAA', stream._buffer)
     467  
     468      def test_readexactly_zero_or_less(self):
     469          # Read exact number of bytes (zero or less).
     470          stream = asyncio.StreamReader(loop=self.loop)
     471          stream.feed_data(self.DATA)
     472  
     473          data = self.loop.run_until_complete(stream.readexactly(0))
     474          self.assertEqual(b'', data)
     475          self.assertEqual(self.DATA, stream._buffer)
     476  
     477          with self.assertRaisesRegex(ValueError, 'less than zero'):
     478              self.loop.run_until_complete(stream.readexactly(-1))
     479          self.assertEqual(self.DATA, stream._buffer)
     480  
     481      def test_readexactly(self):
     482          # Read exact number of bytes.
     483          stream = asyncio.StreamReader(loop=self.loop)
     484  
     485          n = 2 * len(self.DATA)
     486          read_task = self.loop.create_task(stream.readexactly(n))
     487  
     488          def cb():
     489              stream.feed_data(self.DATA)
     490              stream.feed_data(self.DATA)
     491              stream.feed_data(self.DATA)
     492          self.loop.call_soon(cb)
     493  
     494          data = self.loop.run_until_complete(read_task)
     495          self.assertEqual(self.DATA + self.DATA, data)
     496          self.assertEqual(self.DATA, stream._buffer)
     497  
     498      def test_readexactly_limit(self):
     499          stream = asyncio.StreamReader(limit=3, loop=self.loop)
     500          stream.feed_data(b'chunk')
     501          data = self.loop.run_until_complete(stream.readexactly(5))
     502          self.assertEqual(b'chunk', data)
     503          self.assertEqual(b'', stream._buffer)
     504  
     505      def test_readexactly_eof(self):
     506          # Read exact number of bytes (eof).
     507          stream = asyncio.StreamReader(loop=self.loop)
     508          n = 2 * len(self.DATA)
     509          read_task = self.loop.create_task(stream.readexactly(n))
     510  
     511          def cb():
     512              stream.feed_data(self.DATA)
     513              stream.feed_eof()
     514          self.loop.call_soon(cb)
     515  
     516          with self.assertRaises(asyncio.IncompleteReadError) as cm:
     517              self.loop.run_until_complete(read_task)
     518          self.assertEqual(cm.exception.partial, self.DATA)
     519          self.assertEqual(cm.exception.expected, n)
     520          self.assertEqual(str(cm.exception),
     521                           '18 bytes read on a total of 36 expected bytes')
     522          self.assertEqual(b'', stream._buffer)
     523  
     524      def test_readexactly_exception(self):
     525          stream = asyncio.StreamReader(loop=self.loop)
     526          stream.feed_data(b'line\n')
     527  
     528          data = self.loop.run_until_complete(stream.readexactly(2))
     529          self.assertEqual(b'li', data)
     530  
     531          stream.set_exception(ValueError())
     532          self.assertRaises(
     533              ValueError, self.loop.run_until_complete, stream.readexactly(2))
     534  
     535      def test_exception(self):
     536          stream = asyncio.StreamReader(loop=self.loop)
     537          self.assertIsNone(stream.exception())
     538  
     539          exc = ValueError()
     540          stream.set_exception(exc)
     541          self.assertIs(stream.exception(), exc)
     542  
     543      def test_exception_waiter(self):
     544          stream = asyncio.StreamReader(loop=self.loop)
     545  
     546          async def set_err():
     547              stream.set_exception(ValueError())
     548  
     549          t1 = self.loop.create_task(stream.readline())
     550          t2 = self.loop.create_task(set_err())
     551  
     552          self.loop.run_until_complete(asyncio.wait([t1, t2]))
     553  
     554          self.assertRaises(ValueError, t1.result)
     555  
     556      def test_exception_cancel(self):
     557          stream = asyncio.StreamReader(loop=self.loop)
     558  
     559          t = self.loop.create_task(stream.readline())
     560          test_utils.run_briefly(self.loop)
     561          t.cancel()
     562          test_utils.run_briefly(self.loop)
     563          # The following line fails if set_exception() isn't careful.
     564          stream.set_exception(RuntimeError('message'))
     565          test_utils.run_briefly(self.loop)
     566          self.assertIs(stream._waiter, None)
     567  
     568      def test_start_server(self):
     569  
     570          class ESC[4;38;5;81mMyServer:
     571  
     572              def __init__(self, loop):
     573                  self.server = None
     574                  self.loop = loop
     575  
     576              async def handle_client(self, client_reader, client_writer):
     577                  data = await client_reader.readline()
     578                  client_writer.write(data)
     579                  await client_writer.drain()
     580                  client_writer.close()
     581                  await client_writer.wait_closed()
     582  
     583              def start(self):
     584                  sock = socket.create_server(('127.0.0.1', 0))
     585                  self.server = self.loop.run_until_complete(
     586                      asyncio.start_server(self.handle_client,
     587                                           sock=sock))
     588                  return sock.getsockname()
     589  
     590              def handle_client_callback(self, client_reader, client_writer):
     591                  self.loop.create_task(self.handle_client(client_reader,
     592                                                           client_writer))
     593  
     594              def start_callback(self):
     595                  sock = socket.create_server(('127.0.0.1', 0))
     596                  addr = sock.getsockname()
     597                  sock.close()
     598                  self.server = self.loop.run_until_complete(
     599                      asyncio.start_server(self.handle_client_callback,
     600                                           host=addr[0], port=addr[1]))
     601                  return addr
     602  
     603              def stop(self):
     604                  if self.server is not None:
     605                      self.server.close()
     606                      self.loop.run_until_complete(self.server.wait_closed())
     607                      self.server = None
     608  
     609          async def client(addr):
     610              reader, writer = await asyncio.open_connection(*addr)
     611              # send a line
     612              writer.write(b"hello world!\n")
     613              # read it back
     614              msgback = await reader.readline()
     615              writer.close()
     616              await writer.wait_closed()
     617              return msgback
     618  
     619          messages = []
     620          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
     621  
     622          # test the server variant with a coroutine as client handler
     623          server = MyServer(self.loop)
     624          addr = server.start()
     625          msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
     626          server.stop()
     627          self.assertEqual(msg, b"hello world!\n")
     628  
     629          # test the server variant with a callback as client handler
     630          server = MyServer(self.loop)
     631          addr = server.start_callback()
     632          msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
     633          server.stop()
     634          self.assertEqual(msg, b"hello world!\n")
     635  
     636          self.assertEqual(messages, [])
     637  
     638      @socket_helper.skip_unless_bind_unix_socket
     639      def test_start_unix_server(self):
     640  
     641          class ESC[4;38;5;81mMyServer:
     642  
     643              def __init__(self, loop, path):
     644                  self.server = None
     645                  self.loop = loop
     646                  self.path = path
     647  
     648              async def handle_client(self, client_reader, client_writer):
     649                  data = await client_reader.readline()
     650                  client_writer.write(data)
     651                  await client_writer.drain()
     652                  client_writer.close()
     653                  await client_writer.wait_closed()
     654  
     655              def start(self):
     656                  self.server = self.loop.run_until_complete(
     657                      asyncio.start_unix_server(self.handle_client,
     658                                                path=self.path))
     659  
     660              def handle_client_callback(self, client_reader, client_writer):
     661                  self.loop.create_task(self.handle_client(client_reader,
     662                                                           client_writer))
     663  
     664              def start_callback(self):
     665                  start = asyncio.start_unix_server(self.handle_client_callback,
     666                                                    path=self.path)
     667                  self.server = self.loop.run_until_complete(start)
     668  
     669              def stop(self):
     670                  if self.server is not None:
     671                      self.server.close()
     672                      self.loop.run_until_complete(self.server.wait_closed())
     673                      self.server = None
     674  
     675          async def client(path):
     676              reader, writer = await asyncio.open_unix_connection(path)
     677              # send a line
     678              writer.write(b"hello world!\n")
     679              # read it back
     680              msgback = await reader.readline()
     681              writer.close()
     682              await writer.wait_closed()
     683              return msgback
     684  
     685          messages = []
     686          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
     687  
     688          # test the server variant with a coroutine as client handler
     689          with test_utils.unix_socket_path() as path:
     690              server = MyServer(self.loop, path)
     691              server.start()
     692              msg = self.loop.run_until_complete(
     693                  self.loop.create_task(client(path)))
     694              server.stop()
     695              self.assertEqual(msg, b"hello world!\n")
     696  
     697          # test the server variant with a callback as client handler
     698          with test_utils.unix_socket_path() as path:
     699              server = MyServer(self.loop, path)
     700              server.start_callback()
     701              msg = self.loop.run_until_complete(
     702                  self.loop.create_task(client(path)))
     703              server.stop()
     704              self.assertEqual(msg, b"hello world!\n")
     705  
     706          self.assertEqual(messages, [])
     707  
     708      @unittest.skipIf(ssl is None, 'No ssl module')
     709      def test_start_tls(self):
     710  
     711          class ESC[4;38;5;81mMyServer:
     712  
     713              def __init__(self, loop):
     714                  self.server = None
     715                  self.loop = loop
     716  
     717              async def handle_client(self, client_reader, client_writer):
     718                  data1 = await client_reader.readline()
     719                  client_writer.write(data1)
     720                  await client_writer.drain()
     721                  assert client_writer.get_extra_info('sslcontext') is None
     722                  await client_writer.start_tls(
     723                      test_utils.simple_server_sslcontext())
     724                  assert client_writer.get_extra_info('sslcontext') is not None
     725                  data2 = await client_reader.readline()
     726                  client_writer.write(data2)
     727                  await client_writer.drain()
     728                  client_writer.close()
     729                  await client_writer.wait_closed()
     730  
     731              def start(self):
     732                  sock = socket.create_server(('127.0.0.1', 0))
     733                  self.server = self.loop.run_until_complete(
     734                      asyncio.start_server(self.handle_client,
     735                                           sock=sock))
     736                  return sock.getsockname()
     737  
     738              def stop(self):
     739                  if self.server is not None:
     740                      self.server.close()
     741                      self.loop.run_until_complete(self.server.wait_closed())
     742                      self.server = None
     743  
     744          async def client(addr):
     745              reader, writer = await asyncio.open_connection(*addr)
     746              writer.write(b"hello world 1!\n")
     747              await writer.drain()
     748              msgback1 = await reader.readline()
     749              assert writer.get_extra_info('sslcontext') is None
     750              await writer.start_tls(test_utils.simple_client_sslcontext())
     751              assert writer.get_extra_info('sslcontext') is not None
     752              writer.write(b"hello world 2!\n")
     753              await writer.drain()
     754              msgback2 = await reader.readline()
     755              writer.close()
     756              await writer.wait_closed()
     757              return msgback1, msgback2
     758  
     759          messages = []
     760          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
     761  
     762          server = MyServer(self.loop)
     763          addr = server.start()
     764          msg1, msg2 = self.loop.run_until_complete(client(addr))
     765          server.stop()
     766  
     767          self.assertEqual(messages, [])
     768          self.assertEqual(msg1, b"hello world 1!\n")
     769          self.assertEqual(msg2, b"hello world 2!\n")
     770  
     771      @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
     772      def test_read_all_from_pipe_reader(self):
     773          # See asyncio issue 168.  This test is derived from the example
     774          # subprocess_attach_read_pipe.py, but we configure the
     775          # StreamReader's limit so that twice it is less than the size
     776          # of the data writer.  Also we must explicitly attach a child
     777          # watcher to the event loop.
     778  
     779          code = """\
     780  import os, sys
     781  fd = int(sys.argv[1])
     782  os.write(fd, b'data')
     783  os.close(fd)
     784  """
     785          rfd, wfd = os.pipe()
     786          args = [sys.executable, '-c', code, str(wfd)]
     787  
     788          pipe = open(rfd, 'rb', 0)
     789          reader = asyncio.StreamReader(loop=self.loop, limit=1)
     790          protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
     791          transport, _ = self.loop.run_until_complete(
     792              self.loop.connect_read_pipe(lambda: protocol, pipe))
     793  
     794          watcher = asyncio.SafeChildWatcher()
     795          watcher.attach_loop(self.loop)
     796          try:
     797              asyncio.set_child_watcher(watcher)
     798              create = asyncio.create_subprocess_exec(
     799                  *args,
     800                  pass_fds={wfd},
     801              )
     802              proc = self.loop.run_until_complete(create)
     803              self.loop.run_until_complete(proc.wait())
     804          finally:
     805              asyncio.set_child_watcher(None)
     806  
     807          os.close(wfd)
     808          data = self.loop.run_until_complete(reader.read(-1))
     809          self.assertEqual(data, b'data')
     810  
     811      def test_streamreader_constructor_without_loop(self):
     812          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
     813              asyncio.StreamReader()
     814  
     815      def test_streamreader_constructor_use_running_loop(self):
     816          # asyncio issue #184: Ensure that StreamReaderProtocol constructor
     817          # retrieves the current loop if the loop parameter is not set
     818          async def test():
     819              return asyncio.StreamReader()
     820  
     821          reader = self.loop.run_until_complete(test())
     822          self.assertIs(reader._loop, self.loop)
     823  
     824      def test_streamreader_constructor_use_global_loop(self):
     825          # asyncio issue #184: Ensure that StreamReaderProtocol constructor
     826          # retrieves the current loop if the loop parameter is not set
     827          # Deprecated in 3.10, undeprecated in 3.11.1
     828          self.addCleanup(asyncio.set_event_loop, None)
     829          asyncio.set_event_loop(self.loop)
     830          reader = asyncio.StreamReader()
     831          self.assertIs(reader._loop, self.loop)
     832  
     833  
     834      def test_streamreaderprotocol_constructor_without_loop(self):
     835          reader = mock.Mock()
     836          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
     837              asyncio.StreamReaderProtocol(reader)
     838  
     839      def test_streamreaderprotocol_constructor_use_running_loop(self):
     840          # asyncio issue #184: Ensure that StreamReaderProtocol constructor
     841          # retrieves the current loop if the loop parameter is not set
     842          reader = mock.Mock()
     843          async def test():
     844              return asyncio.StreamReaderProtocol(reader)
     845          protocol = self.loop.run_until_complete(test())
     846          self.assertIs(protocol._loop, self.loop)
     847  
     848      def test_streamreaderprotocol_constructor_use_global_loop(self):
     849          # asyncio issue #184: Ensure that StreamReaderProtocol constructor
     850          # retrieves the current loop if the loop parameter is not set
     851          # Deprecated in 3.10, undeprecated in 3.11.1
     852          self.addCleanup(asyncio.set_event_loop, None)
     853          asyncio.set_event_loop(self.loop)
     854          reader = mock.Mock()
     855          protocol = asyncio.StreamReaderProtocol(reader)
     856          self.assertIs(protocol._loop, self.loop)
     857  
     858      def test_multiple_drain(self):
     859          # See https://github.com/python/cpython/issues/74116
     860          drained = 0
     861  
     862          async def drainer(stream):
     863              nonlocal drained
     864              await stream._drain_helper()
     865              drained += 1
     866  
     867          async def main():
     868              loop = asyncio.get_running_loop()
     869              stream = asyncio.streams.FlowControlMixin(loop)
     870              stream.pause_writing()
     871              loop.call_later(0.1, stream.resume_writing)
     872              await asyncio.gather(*[drainer(stream) for _ in range(10)])
     873              self.assertEqual(drained, 10)
     874  
     875          self.loop.run_until_complete(main())
     876  
     877      def test_drain_raises(self):
     878          # See http://bugs.python.org/issue25441
     879  
     880          # This test should not use asyncio for the mock server; the
     881          # whole point of the test is to test for a bug in drain()
     882          # where it never gives up the event loop but the socket is
     883          # closed on the  server side.
     884  
     885          messages = []
     886          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
     887          q = queue.Queue()
     888  
     889          def server():
     890              # Runs in a separate thread.
     891              with socket.create_server(('localhost', 0)) as sock:
     892                  addr = sock.getsockname()
     893                  q.put(addr)
     894                  clt, _ = sock.accept()
     895                  clt.close()
     896  
     897          async def client(host, port):
     898              reader, writer = await asyncio.open_connection(host, port)
     899  
     900              while True:
     901                  writer.write(b"foo\n")
     902                  await writer.drain()
     903  
     904          # Start the server thread and wait for it to be listening.
     905          thread = threading.Thread(target=server)
     906          thread.daemon = True
     907          thread.start()
     908          addr = q.get()
     909  
     910          # Should not be stuck in an infinite loop.
     911          with self.assertRaises((ConnectionResetError, ConnectionAbortedError,
     912                                  BrokenPipeError)):
     913              self.loop.run_until_complete(client(*addr))
     914  
     915          # Clean up the thread.  (Only on success; on failure, it may
     916          # be stuck in accept().)
     917          thread.join()
     918          self.assertEqual([], messages)
     919  
     920      def test___repr__(self):
     921          stream = asyncio.StreamReader(loop=self.loop)
     922          self.assertEqual("<StreamReader>", repr(stream))
     923  
     924      def test___repr__nondefault_limit(self):
     925          stream = asyncio.StreamReader(loop=self.loop, limit=123)
     926          self.assertEqual("<StreamReader limit=123>", repr(stream))
     927  
     928      def test___repr__eof(self):
     929          stream = asyncio.StreamReader(loop=self.loop)
     930          stream.feed_eof()
     931          self.assertEqual("<StreamReader eof>", repr(stream))
     932  
     933      def test___repr__data(self):
     934          stream = asyncio.StreamReader(loop=self.loop)
     935          stream.feed_data(b'data')
     936          self.assertEqual("<StreamReader 4 bytes>", repr(stream))
     937  
     938      def test___repr__exception(self):
     939          stream = asyncio.StreamReader(loop=self.loop)
     940          exc = RuntimeError()
     941          stream.set_exception(exc)
     942          self.assertEqual("<StreamReader exception=RuntimeError()>",
     943                           repr(stream))
     944  
     945      def test___repr__waiter(self):
     946          stream = asyncio.StreamReader(loop=self.loop)
     947          stream._waiter = asyncio.Future(loop=self.loop)
     948          self.assertRegex(
     949              repr(stream),
     950              r"<StreamReader waiter=<Future pending[\S ]*>>")
     951          stream._waiter.set_result(None)
     952          self.loop.run_until_complete(stream._waiter)
     953          stream._waiter = None
     954          self.assertEqual("<StreamReader>", repr(stream))
     955  
     956      def test___repr__transport(self):
     957          stream = asyncio.StreamReader(loop=self.loop)
     958          stream._transport = mock.Mock()
     959          stream._transport.__repr__ = mock.Mock()
     960          stream._transport.__repr__.return_value = "<Transport>"
     961          self.assertEqual("<StreamReader transport=<Transport>>", repr(stream))
     962  
     963      def test_IncompleteReadError_pickleable(self):
     964          e = asyncio.IncompleteReadError(b'abc', 10)
     965          for proto in range(pickle.HIGHEST_PROTOCOL + 1):
     966              with self.subTest(pickle_protocol=proto):
     967                  e2 = pickle.loads(pickle.dumps(e, protocol=proto))
     968                  self.assertEqual(str(e), str(e2))
     969                  self.assertEqual(e.partial, e2.partial)
     970                  self.assertEqual(e.expected, e2.expected)
     971  
     972      def test_LimitOverrunError_pickleable(self):
     973          e = asyncio.LimitOverrunError('message', 10)
     974          for proto in range(pickle.HIGHEST_PROTOCOL + 1):
     975              with self.subTest(pickle_protocol=proto):
     976                  e2 = pickle.loads(pickle.dumps(e, protocol=proto))
     977                  self.assertEqual(str(e), str(e2))
     978                  self.assertEqual(e.consumed, e2.consumed)
     979  
     980      def test_wait_closed_on_close(self):
     981          with test_utils.run_test_server() as httpd:
     982              rd, wr = self.loop.run_until_complete(
     983                  asyncio.open_connection(*httpd.address))
     984  
     985              wr.write(b'GET / HTTP/1.0\r\n\r\n')
     986              f = rd.readline()
     987              data = self.loop.run_until_complete(f)
     988              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
     989              f = rd.read()
     990              data = self.loop.run_until_complete(f)
     991              self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
     992              self.assertFalse(wr.is_closing())
     993              wr.close()
     994              self.assertTrue(wr.is_closing())
     995              self.loop.run_until_complete(wr.wait_closed())
     996  
     997      def test_wait_closed_on_close_with_unread_data(self):
     998          with test_utils.run_test_server() as httpd:
     999              rd, wr = self.loop.run_until_complete(
    1000                  asyncio.open_connection(*httpd.address))
    1001  
    1002              wr.write(b'GET / HTTP/1.0\r\n\r\n')
    1003              f = rd.readline()
    1004              data = self.loop.run_until_complete(f)
    1005              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
    1006              wr.close()
    1007              self.loop.run_until_complete(wr.wait_closed())
    1008  
    1009      def test_async_writer_api(self):
    1010          async def inner(httpd):
    1011              rd, wr = await asyncio.open_connection(*httpd.address)
    1012  
    1013              wr.write(b'GET / HTTP/1.0\r\n\r\n')
    1014              data = await rd.readline()
    1015              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
    1016              data = await rd.read()
    1017              self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
    1018              wr.close()
    1019              await wr.wait_closed()
    1020  
    1021          messages = []
    1022          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1023  
    1024          with test_utils.run_test_server() as httpd:
    1025              self.loop.run_until_complete(inner(httpd))
    1026  
    1027          self.assertEqual(messages, [])
    1028  
    1029      def test_async_writer_api_exception_after_close(self):
    1030          async def inner(httpd):
    1031              rd, wr = await asyncio.open_connection(*httpd.address)
    1032  
    1033              wr.write(b'GET / HTTP/1.0\r\n\r\n')
    1034              data = await rd.readline()
    1035              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
    1036              data = await rd.read()
    1037              self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
    1038              wr.close()
    1039              with self.assertRaises(ConnectionResetError):
    1040                  wr.write(b'data')
    1041                  await wr.drain()
    1042  
    1043          messages = []
    1044          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1045  
    1046          with test_utils.run_test_server() as httpd:
    1047              self.loop.run_until_complete(inner(httpd))
    1048  
    1049          self.assertEqual(messages, [])
    1050  
    1051      def test_eof_feed_when_closing_writer(self):
    1052          # See http://bugs.python.org/issue35065
    1053          messages = []
    1054          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1055  
    1056          with test_utils.run_test_server() as httpd:
    1057              rd, wr = self.loop.run_until_complete(
    1058                      asyncio.open_connection(*httpd.address))
    1059  
    1060              wr.close()
    1061              f = wr.wait_closed()
    1062              self.loop.run_until_complete(f)
    1063              self.assertTrue(rd.at_eof())
    1064              f = rd.read()
    1065              data = self.loop.run_until_complete(f)
    1066              self.assertEqual(data, b'')
    1067  
    1068          self.assertEqual(messages, [])
    1069  
    1070      def test_unclosed_resource_warnings(self):
    1071          async def inner(httpd):
    1072              rd, wr = await asyncio.open_connection(*httpd.address)
    1073  
    1074              wr.write(b'GET / HTTP/1.0\r\n\r\n')
    1075              data = await rd.readline()
    1076              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
    1077              data = await rd.read()
    1078              self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
    1079              with self.assertWarns(ResourceWarning) as cm:
    1080                  del wr
    1081                  gc.collect()
    1082                  self.assertEqual(len(cm.warnings), 1)
    1083                  self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
    1084  
    1085          messages = []
    1086          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1087  
    1088          with test_utils.run_test_server() as httpd:
    1089              self.loop.run_until_complete(inner(httpd))
    1090  
    1091          self.assertEqual(messages, [])
    1092  
    1093      def test_loop_is_closed_resource_warnings(self):
    1094          async def inner(httpd):
    1095              rd, wr = await asyncio.open_connection(*httpd.address)
    1096  
    1097              wr.write(b'GET / HTTP/1.0\r\n\r\n')
    1098              data = await rd.readline()
    1099              self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
    1100              data = await rd.read()
    1101              self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
    1102  
    1103              # Make "loop is closed" occur first before "del wr" for this test.
    1104              self.loop.stop()
    1105              wr.close()
    1106              while not self.loop.is_closed():
    1107                  await asyncio.sleep(0.0)
    1108  
    1109              with self.assertWarns(ResourceWarning) as cm:
    1110                  del wr
    1111                  gc.collect()
    1112                  self.assertEqual(len(cm.warnings), 1)
    1113                  self.assertEqual("loop is closed", str(cm.warnings[0].message))
    1114  
    1115          messages = []
    1116          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1117  
    1118          with test_utils.run_test_server() as httpd:
    1119              with self.assertRaises(RuntimeError):
    1120                  # This exception is caused by `self.loop.stop()` as expected.
    1121                  self.loop.run_until_complete(inner(httpd))
    1122              gc.collect()
    1123  
    1124          self.assertEqual(messages, [])
    1125  
    1126      def test_unhandled_exceptions(self) -> None:
    1127          port = socket_helper.find_unused_port()
    1128  
    1129          messages = []
    1130          self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
    1131  
    1132          async def client():
    1133              rd, wr = await asyncio.open_connection('localhost', port)
    1134              wr.write(b'test msg')
    1135              await wr.drain()
    1136              wr.close()
    1137              await wr.wait_closed()
    1138  
    1139          async def main():
    1140              async def handle_echo(reader, writer):
    1141                  raise Exception('test')
    1142  
    1143              server = await asyncio.start_server(
    1144                  handle_echo, 'localhost', port)
    1145              await server.start_serving()
    1146              await client()
    1147              server.close()
    1148              await server.wait_closed()
    1149  
    1150          self.loop.run_until_complete(main())
    1151  
    1152          self.assertEqual(messages[0]['message'],
    1153                           'Unhandled exception in client_connected_cb')
    1154          # Break explicitly reference cycle
    1155          messages = None
    1156  
    1157  
    1158  if __name__ == '__main__':
    1159      unittest.main()