python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_events.py
       1  """Tests for events.py."""
       2  
       3  import collections.abc
       4  import concurrent.futures
       5  import functools
       6  import io
       7  import multiprocessing
       8  import os
       9  import platform
      10  import re
      11  import signal
      12  import socket
      13  try:
      14      import ssl
      15  except ImportError:
      16      ssl = None
      17  import subprocess
      18  import sys
      19  import threading
      20  import time
      21  import types
      22  import errno
      23  import unittest
      24  from unittest import mock
      25  import weakref
      26  import warnings
      27  if sys.platform not in ('win32', 'vxworks'):
      28      import tty
      29  
      30  import asyncio
      31  from asyncio import coroutines
      32  from asyncio import events
      33  from asyncio import selector_events
      34  from multiprocessing.util import _cleanup_tests as multiprocessing_cleanup_tests
      35  from test.test_asyncio import utils as test_utils
      36  from test import support
      37  from test.support import socket_helper
      38  from test.support import threading_helper
      39  from test.support import ALWAYS_EQ, LARGEST, SMALLEST
      40  
      41  
      42  def tearDownModule():
      43      asyncio.set_event_loop_policy(None)
      44  
      45  
      46  def broken_unix_getsockname():
      47      """Return True if the platform is Mac OS 10.4 or older."""
      48      if sys.platform.startswith("aix"):
      49          return True
      50      elif sys.platform != 'darwin':
      51          return False
      52      version = platform.mac_ver()[0]
      53      version = tuple(map(int, version.split('.')))
      54      return version < (10, 5)
      55  
      56  
      57  def _test_get_event_loop_new_process__sub_proc():
      58      async def doit():
      59          return 'hello'
      60  
      61      loop = asyncio.new_event_loop()
      62      asyncio.set_event_loop(loop)
      63      return loop.run_until_complete(doit())
      64  
      65  
      66  class ESC[4;38;5;81mCoroLike:
      67      def send(self, v):
      68          pass
      69  
      70      def throw(self, *exc):
      71          pass
      72  
      73      def close(self):
      74          pass
      75  
      76      def __await__(self):
      77          pass
      78  
      79  
      80  class ESC[4;38;5;81mMyBaseProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
      81      connected = None
      82      done = None
      83  
      84      def __init__(self, loop=None):
      85          self.transport = None
      86          self.state = 'INITIAL'
      87          self.nbytes = 0
      88          if loop is not None:
      89              self.connected = loop.create_future()
      90              self.done = loop.create_future()
      91  
      92      def _assert_state(self, *expected):
      93          if self.state not in expected:
      94              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
      95  
      96      def connection_made(self, transport):
      97          self.transport = transport
      98          self._assert_state('INITIAL')
      99          self.state = 'CONNECTED'
     100          if self.connected:
     101              self.connected.set_result(None)
     102  
     103      def data_received(self, data):
     104          self._assert_state('CONNECTED')
     105          self.nbytes += len(data)
     106  
     107      def eof_received(self):
     108          self._assert_state('CONNECTED')
     109          self.state = 'EOF'
     110  
     111      def connection_lost(self, exc):
     112          self._assert_state('CONNECTED', 'EOF')
     113          self.state = 'CLOSED'
     114          if self.done:
     115              self.done.set_result(None)
     116  
     117  
     118  class ESC[4;38;5;81mMyProto(ESC[4;38;5;149mMyBaseProto):
     119      def connection_made(self, transport):
     120          super().connection_made(transport)
     121          transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
     122  
     123  
     124  class ESC[4;38;5;81mMyDatagramProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDatagramProtocol):
     125      done = None
     126  
     127      def __init__(self, loop=None):
     128          self.state = 'INITIAL'
     129          self.nbytes = 0
     130          if loop is not None:
     131              self.done = loop.create_future()
     132  
     133      def _assert_state(self, expected):
     134          if self.state != expected:
     135              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
     136  
     137      def connection_made(self, transport):
     138          self.transport = transport
     139          self._assert_state('INITIAL')
     140          self.state = 'INITIALIZED'
     141  
     142      def datagram_received(self, data, addr):
     143          self._assert_state('INITIALIZED')
     144          self.nbytes += len(data)
     145  
     146      def error_received(self, exc):
     147          self._assert_state('INITIALIZED')
     148  
     149      def connection_lost(self, exc):
     150          self._assert_state('INITIALIZED')
     151          self.state = 'CLOSED'
     152          if self.done:
     153              self.done.set_result(None)
     154  
     155  
     156  class ESC[4;38;5;81mMyReadPipeProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     157      done = None
     158  
     159      def __init__(self, loop=None):
     160          self.state = ['INITIAL']
     161          self.nbytes = 0
     162          self.transport = None
     163          if loop is not None:
     164              self.done = loop.create_future()
     165  
     166      def _assert_state(self, expected):
     167          if self.state != expected:
     168              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
     169  
     170      def connection_made(self, transport):
     171          self.transport = transport
     172          self._assert_state(['INITIAL'])
     173          self.state.append('CONNECTED')
     174  
     175      def data_received(self, data):
     176          self._assert_state(['INITIAL', 'CONNECTED'])
     177          self.nbytes += len(data)
     178  
     179      def eof_received(self):
     180          self._assert_state(['INITIAL', 'CONNECTED'])
     181          self.state.append('EOF')
     182  
     183      def connection_lost(self, exc):
     184          if 'EOF' not in self.state:
     185              self.state.append('EOF')  # It is okay if EOF is missed.
     186          self._assert_state(['INITIAL', 'CONNECTED', 'EOF'])
     187          self.state.append('CLOSED')
     188          if self.done:
     189              self.done.set_result(None)
     190  
     191  
     192  class ESC[4;38;5;81mMyWritePipeProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mBaseProtocol):
     193      done = None
     194  
     195      def __init__(self, loop=None):
     196          self.state = 'INITIAL'
     197          self.transport = None
     198          if loop is not None:
     199              self.done = loop.create_future()
     200  
     201      def _assert_state(self, expected):
     202          if self.state != expected:
     203              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
     204  
     205      def connection_made(self, transport):
     206          self.transport = transport
     207          self._assert_state('INITIAL')
     208          self.state = 'CONNECTED'
     209  
     210      def connection_lost(self, exc):
     211          self._assert_state('CONNECTED')
     212          self.state = 'CLOSED'
     213          if self.done:
     214              self.done.set_result(None)
     215  
     216  
     217  class ESC[4;38;5;81mMySubprocessProtocol(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mSubprocessProtocol):
     218  
     219      def __init__(self, loop):
     220          self.state = 'INITIAL'
     221          self.transport = None
     222          self.connected = loop.create_future()
     223          self.completed = loop.create_future()
     224          self.disconnects = {fd: loop.create_future() for fd in range(3)}
     225          self.data = {1: b'', 2: b''}
     226          self.returncode = None
     227          self.got_data = {1: asyncio.Event(),
     228                           2: asyncio.Event()}
     229  
     230      def _assert_state(self, expected):
     231          if self.state != expected:
     232              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
     233  
     234      def connection_made(self, transport):
     235          self.transport = transport
     236          self._assert_state('INITIAL')
     237          self.state = 'CONNECTED'
     238          self.connected.set_result(None)
     239  
     240      def connection_lost(self, exc):
     241          self._assert_state('CONNECTED')
     242          self.state = 'CLOSED'
     243          self.completed.set_result(None)
     244  
     245      def pipe_data_received(self, fd, data):
     246          self._assert_state('CONNECTED')
     247          self.data[fd] += data
     248          self.got_data[fd].set()
     249  
     250      def pipe_connection_lost(self, fd, exc):
     251          self._assert_state('CONNECTED')
     252          if exc:
     253              self.disconnects[fd].set_exception(exc)
     254          else:
     255              self.disconnects[fd].set_result(exc)
     256  
     257      def process_exited(self):
     258          self._assert_state('CONNECTED')
     259          self.returncode = self.transport.get_returncode()
     260  
     261  
     262  class ESC[4;38;5;81mEventLoopTestsMixin:
     263  
     264      def setUp(self):
     265          super().setUp()
     266          self.loop = self.create_event_loop()
     267          self.set_event_loop(self.loop)
     268  
     269      def tearDown(self):
     270          # just in case if we have transport close callbacks
     271          if not self.loop.is_closed():
     272              test_utils.run_briefly(self.loop)
     273  
     274          self.doCleanups()
     275          support.gc_collect()
     276          super().tearDown()
     277  
     278      def test_run_until_complete_nesting(self):
     279          async def coro1():
     280              await asyncio.sleep(0)
     281  
     282          async def coro2():
     283              self.assertTrue(self.loop.is_running())
     284              self.loop.run_until_complete(coro1())
     285  
     286          with self.assertWarnsRegex(
     287              RuntimeWarning,
     288              r"coroutine \S+ was never awaited"
     289          ):
     290              self.assertRaises(
     291                  RuntimeError, self.loop.run_until_complete, coro2())
     292  
     293      # Note: because of the default Windows timing granularity of
     294      # 15.6 msec, we use fairly long sleep times here (~100 msec).
     295  
     296      def test_run_until_complete(self):
     297          t0 = self.loop.time()
     298          self.loop.run_until_complete(asyncio.sleep(0.1))
     299          t1 = self.loop.time()
     300          self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
     301  
     302      def test_run_until_complete_stopped(self):
     303  
     304          async def cb():
     305              self.loop.stop()
     306              await asyncio.sleep(0.1)
     307          task = cb()
     308          self.assertRaises(RuntimeError,
     309                            self.loop.run_until_complete, task)
     310  
     311      def test_call_later(self):
     312          results = []
     313  
     314          def callback(arg):
     315              results.append(arg)
     316              self.loop.stop()
     317  
     318          self.loop.call_later(0.1, callback, 'hello world')
     319          self.loop.run_forever()
     320          self.assertEqual(results, ['hello world'])
     321  
     322      def test_call_soon(self):
     323          results = []
     324  
     325          def callback(arg1, arg2):
     326              results.append((arg1, arg2))
     327              self.loop.stop()
     328  
     329          self.loop.call_soon(callback, 'hello', 'world')
     330          self.loop.run_forever()
     331          self.assertEqual(results, [('hello', 'world')])
     332  
     333      def test_call_soon_threadsafe(self):
     334          results = []
     335          lock = threading.Lock()
     336  
     337          def callback(arg):
     338              results.append(arg)
     339              if len(results) >= 2:
     340                  self.loop.stop()
     341  
     342          def run_in_thread():
     343              self.loop.call_soon_threadsafe(callback, 'hello')
     344              lock.release()
     345  
     346          lock.acquire()
     347          t = threading.Thread(target=run_in_thread)
     348          t.start()
     349  
     350          with lock:
     351              self.loop.call_soon(callback, 'world')
     352              self.loop.run_forever()
     353          t.join()
     354          self.assertEqual(results, ['hello', 'world'])
     355  
     356      def test_call_soon_threadsafe_same_thread(self):
     357          results = []
     358  
     359          def callback(arg):
     360              results.append(arg)
     361              if len(results) >= 2:
     362                  self.loop.stop()
     363  
     364          self.loop.call_soon_threadsafe(callback, 'hello')
     365          self.loop.call_soon(callback, 'world')
     366          self.loop.run_forever()
     367          self.assertEqual(results, ['hello', 'world'])
     368  
     369      def test_run_in_executor(self):
     370          def run(arg):
     371              return (arg, threading.get_ident())
     372          f2 = self.loop.run_in_executor(None, run, 'yo')
     373          res, thread_id = self.loop.run_until_complete(f2)
     374          self.assertEqual(res, 'yo')
     375          self.assertNotEqual(thread_id, threading.get_ident())
     376  
     377      def test_run_in_executor_cancel(self):
     378          called = False
     379  
     380          def patched_call_soon(*args):
     381              nonlocal called
     382              called = True
     383  
     384          def run():
     385              time.sleep(0.05)
     386  
     387          f2 = self.loop.run_in_executor(None, run)
     388          f2.cancel()
     389          self.loop.run_until_complete(
     390                  self.loop.shutdown_default_executor())
     391          self.loop.close()
     392          self.loop.call_soon = patched_call_soon
     393          self.loop.call_soon_threadsafe = patched_call_soon
     394          time.sleep(0.4)
     395          self.assertFalse(called)
     396  
     397      def test_reader_callback(self):
     398          r, w = socket.socketpair()
     399          r.setblocking(False)
     400          bytes_read = bytearray()
     401  
     402          def reader():
     403              try:
     404                  data = r.recv(1024)
     405              except BlockingIOError:
     406                  # Spurious readiness notifications are possible
     407                  # at least on Linux -- see man select.
     408                  return
     409              if data:
     410                  bytes_read.extend(data)
     411              else:
     412                  self.assertTrue(self.loop.remove_reader(r.fileno()))
     413                  r.close()
     414  
     415          self.loop.add_reader(r.fileno(), reader)
     416          self.loop.call_soon(w.send, b'abc')
     417          test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
     418          self.loop.call_soon(w.send, b'def')
     419          test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
     420          self.loop.call_soon(w.close)
     421          self.loop.call_soon(self.loop.stop)
     422          self.loop.run_forever()
     423          self.assertEqual(bytes_read, b'abcdef')
     424  
     425      def test_writer_callback(self):
     426          r, w = socket.socketpair()
     427          w.setblocking(False)
     428  
     429          def writer(data):
     430              w.send(data)
     431              self.loop.stop()
     432  
     433          data = b'x' * 1024
     434          self.loop.add_writer(w.fileno(), writer, data)
     435          self.loop.run_forever()
     436  
     437          self.assertTrue(self.loop.remove_writer(w.fileno()))
     438          self.assertFalse(self.loop.remove_writer(w.fileno()))
     439  
     440          w.close()
     441          read = r.recv(len(data) * 2)
     442          r.close()
     443          self.assertEqual(read, data)
     444  
     445      @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
     446      def test_add_signal_handler(self):
     447          caught = 0
     448  
     449          def my_handler():
     450              nonlocal caught
     451              caught += 1
     452  
     453          # Check error behavior first.
     454          self.assertRaises(
     455              TypeError, self.loop.add_signal_handler, 'boom', my_handler)
     456          self.assertRaises(
     457              TypeError, self.loop.remove_signal_handler, 'boom')
     458          self.assertRaises(
     459              ValueError, self.loop.add_signal_handler, signal.NSIG+1,
     460              my_handler)
     461          self.assertRaises(
     462              ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
     463          self.assertRaises(
     464              ValueError, self.loop.add_signal_handler, 0, my_handler)
     465          self.assertRaises(
     466              ValueError, self.loop.remove_signal_handler, 0)
     467          self.assertRaises(
     468              ValueError, self.loop.add_signal_handler, -1, my_handler)
     469          self.assertRaises(
     470              ValueError, self.loop.remove_signal_handler, -1)
     471          self.assertRaises(
     472              RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
     473              my_handler)
     474          # Removing SIGKILL doesn't raise, since we don't call signal().
     475          self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
     476          # Now set a handler and handle it.
     477          self.loop.add_signal_handler(signal.SIGINT, my_handler)
     478  
     479          os.kill(os.getpid(), signal.SIGINT)
     480          test_utils.run_until(self.loop, lambda: caught)
     481  
     482          # Removing it should restore the default handler.
     483          self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
     484          self.assertEqual(signal.getsignal(signal.SIGINT),
     485                           signal.default_int_handler)
     486          # Removing again returns False.
     487          self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
     488  
     489      @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
     490      @unittest.skipUnless(hasattr(signal, 'setitimer'),
     491                           'need signal.setitimer()')
     492      def test_signal_handling_while_selecting(self):
     493          # Test with a signal actually arriving during a select() call.
     494          caught = 0
     495  
     496          def my_handler():
     497              nonlocal caught
     498              caught += 1
     499              self.loop.stop()
     500  
     501          self.loop.add_signal_handler(signal.SIGALRM, my_handler)
     502  
     503          signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
     504          self.loop.call_later(60, self.loop.stop)
     505          self.loop.run_forever()
     506          self.assertEqual(caught, 1)
     507  
     508      @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
     509      @unittest.skipUnless(hasattr(signal, 'setitimer'),
     510                           'need signal.setitimer()')
     511      def test_signal_handling_args(self):
     512          some_args = (42,)
     513          caught = 0
     514  
     515          def my_handler(*args):
     516              nonlocal caught
     517              caught += 1
     518              self.assertEqual(args, some_args)
     519              self.loop.stop()
     520  
     521          self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
     522  
     523          signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
     524          self.loop.call_later(60, self.loop.stop)
     525          self.loop.run_forever()
     526          self.assertEqual(caught, 1)
     527  
     528      def _basetest_create_connection(self, connection_fut, check_sockname=True):
     529          tr, pr = self.loop.run_until_complete(connection_fut)
     530          self.assertIsInstance(tr, asyncio.Transport)
     531          self.assertIsInstance(pr, asyncio.Protocol)
     532          self.assertIs(pr.transport, tr)
     533          if check_sockname:
     534              self.assertIsNotNone(tr.get_extra_info('sockname'))
     535          self.loop.run_until_complete(pr.done)
     536          self.assertGreater(pr.nbytes, 0)
     537          tr.close()
     538  
     539      def test_create_connection(self):
     540          with test_utils.run_test_server() as httpd:
     541              conn_fut = self.loop.create_connection(
     542                  lambda: MyProto(loop=self.loop), *httpd.address)
     543              self._basetest_create_connection(conn_fut)
     544  
     545      @socket_helper.skip_unless_bind_unix_socket
     546      def test_create_unix_connection(self):
     547          # Issue #20682: On Mac OS X Tiger, getsockname() returns a
     548          # zero-length address for UNIX socket.
     549          check_sockname = not broken_unix_getsockname()
     550  
     551          with test_utils.run_test_unix_server() as httpd:
     552              conn_fut = self.loop.create_unix_connection(
     553                  lambda: MyProto(loop=self.loop), httpd.address)
     554              self._basetest_create_connection(conn_fut, check_sockname)
     555  
     556      def check_ssl_extra_info(self, client, check_sockname=True,
     557                               peername=None, peercert={}):
     558          if check_sockname:
     559              self.assertIsNotNone(client.get_extra_info('sockname'))
     560          if peername:
     561              self.assertEqual(peername,
     562                               client.get_extra_info('peername'))
     563          else:
     564              self.assertIsNotNone(client.get_extra_info('peername'))
     565          self.assertEqual(peercert,
     566                           client.get_extra_info('peercert'))
     567  
     568          # test SSL cipher
     569          cipher = client.get_extra_info('cipher')
     570          self.assertIsInstance(cipher, tuple)
     571          self.assertEqual(len(cipher), 3, cipher)
     572          self.assertIsInstance(cipher[0], str)
     573          self.assertIsInstance(cipher[1], str)
     574          self.assertIsInstance(cipher[2], int)
     575  
     576          # test SSL object
     577          sslobj = client.get_extra_info('ssl_object')
     578          self.assertIsNotNone(sslobj)
     579          self.assertEqual(sslobj.compression(),
     580                           client.get_extra_info('compression'))
     581          self.assertEqual(sslobj.cipher(),
     582                           client.get_extra_info('cipher'))
     583          self.assertEqual(sslobj.getpeercert(),
     584                           client.get_extra_info('peercert'))
     585          self.assertEqual(sslobj.compression(),
     586                           client.get_extra_info('compression'))
     587  
     588      def _basetest_create_ssl_connection(self, connection_fut,
     589                                          check_sockname=True,
     590                                          peername=None):
     591          tr, pr = self.loop.run_until_complete(connection_fut)
     592          self.assertIsInstance(tr, asyncio.Transport)
     593          self.assertIsInstance(pr, asyncio.Protocol)
     594          self.assertTrue('ssl' in tr.__class__.__name__.lower())
     595          self.check_ssl_extra_info(tr, check_sockname, peername)
     596          self.loop.run_until_complete(pr.done)
     597          self.assertGreater(pr.nbytes, 0)
     598          tr.close()
     599  
     600      def _test_create_ssl_connection(self, httpd, create_connection,
     601                                      check_sockname=True, peername=None):
     602          conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
     603          self._basetest_create_ssl_connection(conn_fut, check_sockname,
     604                                               peername)
     605  
     606          # ssl.Purpose was introduced in Python 3.4
     607          if hasattr(ssl, 'Purpose'):
     608              def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
     609                                            cafile=None, capath=None,
     610                                            cadata=None):
     611                  """
     612                  A ssl.create_default_context() replacement that doesn't enable
     613                  cert validation.
     614                  """
     615                  self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
     616                  return test_utils.dummy_ssl_context()
     617  
     618              # With ssl=True, ssl.create_default_context() should be called
     619              with mock.patch('ssl.create_default_context',
     620                              side_effect=_dummy_ssl_create_context) as m:
     621                  conn_fut = create_connection(ssl=True)
     622                  self._basetest_create_ssl_connection(conn_fut, check_sockname,
     623                                                       peername)
     624                  self.assertEqual(m.call_count, 1)
     625  
     626          # With the real ssl.create_default_context(), certificate
     627          # validation will fail
     628          with self.assertRaises(ssl.SSLError) as cm:
     629              conn_fut = create_connection(ssl=True)
     630              # Ignore the "SSL handshake failed" log in debug mode
     631              with test_utils.disable_logger():
     632                  self._basetest_create_ssl_connection(conn_fut, check_sockname,
     633                                                       peername)
     634  
     635          self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
     636  
     637      @unittest.skipIf(ssl is None, 'No ssl module')
     638      def test_create_ssl_connection(self):
     639          with test_utils.run_test_server(use_ssl=True) as httpd:
     640              create_connection = functools.partial(
     641                  self.loop.create_connection,
     642                  lambda: MyProto(loop=self.loop),
     643                  *httpd.address)
     644              self._test_create_ssl_connection(httpd, create_connection,
     645                                               peername=httpd.address)
     646  
     647      @socket_helper.skip_unless_bind_unix_socket
     648      @unittest.skipIf(ssl is None, 'No ssl module')
     649      def test_create_ssl_unix_connection(self):
     650          # Issue #20682: On Mac OS X Tiger, getsockname() returns a
     651          # zero-length address for UNIX socket.
     652          check_sockname = not broken_unix_getsockname()
     653  
     654          with test_utils.run_test_unix_server(use_ssl=True) as httpd:
     655              create_connection = functools.partial(
     656                  self.loop.create_unix_connection,
     657                  lambda: MyProto(loop=self.loop), httpd.address,
     658                  server_hostname='127.0.0.1')
     659  
     660              self._test_create_ssl_connection(httpd, create_connection,
     661                                               check_sockname,
     662                                               peername=httpd.address)
     663  
     664      def test_create_connection_local_addr(self):
     665          with test_utils.run_test_server() as httpd:
     666              port = socket_helper.find_unused_port()
     667              f = self.loop.create_connection(
     668                  lambda: MyProto(loop=self.loop),
     669                  *httpd.address, local_addr=(httpd.address[0], port))
     670              tr, pr = self.loop.run_until_complete(f)
     671              expected = pr.transport.get_extra_info('sockname')[1]
     672              self.assertEqual(port, expected)
     673              tr.close()
     674  
     675      @socket_helper.skip_if_tcp_blackhole
     676      def test_create_connection_local_addr_skip_different_family(self):
     677          # See https://github.com/python/cpython/issues/86508
     678          port1 = socket_helper.find_unused_port()
     679          port2 = socket_helper.find_unused_port()
     680          getaddrinfo_orig = self.loop.getaddrinfo
     681  
     682          async def getaddrinfo(host, port, *args, **kwargs):
     683              if port == port2:
     684                  return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)),
     685                          (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))]
     686              return await getaddrinfo_orig(host, port, *args, **kwargs)
     687  
     688          self.loop.getaddrinfo = getaddrinfo
     689  
     690          f = self.loop.create_connection(
     691              lambda: MyProto(loop=self.loop),
     692              'localhost', port1, local_addr=('localhost', port2))
     693  
     694          with self.assertRaises(OSError):
     695              self.loop.run_until_complete(f)
     696  
     697      @socket_helper.skip_if_tcp_blackhole
     698      def test_create_connection_local_addr_nomatch_family(self):
     699          # See https://github.com/python/cpython/issues/86508
     700          port1 = socket_helper.find_unused_port()
     701          port2 = socket_helper.find_unused_port()
     702          getaddrinfo_orig = self.loop.getaddrinfo
     703  
     704          async def getaddrinfo(host, port, *args, **kwargs):
     705              if port == port2:
     706                  return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))]
     707              return await getaddrinfo_orig(host, port, *args, **kwargs)
     708  
     709          self.loop.getaddrinfo = getaddrinfo
     710  
     711          f = self.loop.create_connection(
     712              lambda: MyProto(loop=self.loop),
     713              'localhost', port1, local_addr=('localhost', port2))
     714  
     715          with self.assertRaises(OSError):
     716              self.loop.run_until_complete(f)
     717  
     718      def test_create_connection_local_addr_in_use(self):
     719          with test_utils.run_test_server() as httpd:
     720              f = self.loop.create_connection(
     721                  lambda: MyProto(loop=self.loop),
     722                  *httpd.address, local_addr=httpd.address)
     723              with self.assertRaises(OSError) as cm:
     724                  self.loop.run_until_complete(f)
     725              self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
     726              self.assertIn(str(httpd.address), cm.exception.strerror)
     727  
     728      def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
     729          loop = self.loop
     730  
     731          class ESC[4;38;5;81mMyProto(ESC[4;38;5;149mMyBaseProto):
     732  
     733              def connection_lost(self, exc):
     734                  super().connection_lost(exc)
     735                  loop.call_soon(loop.stop)
     736  
     737              def data_received(self, data):
     738                  super().data_received(data)
     739                  self.transport.write(expected_response)
     740  
     741          lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
     742          addr = lsock.getsockname()
     743  
     744          message = b'test data'
     745          response = None
     746          expected_response = b'roger'
     747  
     748          def client():
     749              nonlocal response
     750              try:
     751                  csock = socket.socket()
     752                  if client_ssl is not None:
     753                      csock = client_ssl.wrap_socket(csock)
     754                  csock.connect(addr)
     755                  csock.sendall(message)
     756                  response = csock.recv(99)
     757                  csock.close()
     758              except Exception as exc:
     759                  print(
     760                      "Failure in client thread in test_connect_accepted_socket",
     761                      exc)
     762  
     763          thread = threading.Thread(target=client, daemon=True)
     764          thread.start()
     765  
     766          conn, _ = lsock.accept()
     767          proto = MyProto(loop=loop)
     768          proto.loop = loop
     769          loop.run_until_complete(
     770              loop.connect_accepted_socket(
     771                  (lambda: proto), conn, ssl=server_ssl))
     772          loop.run_forever()
     773          proto.transport.close()
     774          lsock.close()
     775  
     776          threading_helper.join_thread(thread)
     777          self.assertFalse(thread.is_alive())
     778          self.assertEqual(proto.state, 'CLOSED')
     779          self.assertEqual(proto.nbytes, len(message))
     780          self.assertEqual(response, expected_response)
     781  
     782      @unittest.skipIf(ssl is None, 'No ssl module')
     783      def test_ssl_connect_accepted_socket(self):
     784          server_context = test_utils.simple_server_sslcontext()
     785          client_context = test_utils.simple_client_sslcontext()
     786  
     787          self.test_connect_accepted_socket(server_context, client_context)
     788  
     789      def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
     790          sock = socket.socket()
     791          self.addCleanup(sock.close)
     792          coro = self.loop.connect_accepted_socket(
     793              MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT)
     794          with self.assertRaisesRegex(
     795                  ValueError,
     796                  'ssl_handshake_timeout is only meaningful with ssl'):
     797              self.loop.run_until_complete(coro)
     798  
     799      @mock.patch('asyncio.base_events.socket')
     800      def create_server_multiple_hosts(self, family, hosts, mock_sock):
     801          async def getaddrinfo(host, port, *args, **kw):
     802              if family == socket.AF_INET:
     803                  return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
     804              else:
     805                  return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
     806  
     807          def getaddrinfo_task(*args, **kwds):
     808              return self.loop.create_task(getaddrinfo(*args, **kwds))
     809  
     810          unique_hosts = set(hosts)
     811  
     812          if family == socket.AF_INET:
     813              mock_sock.socket().getsockbyname.side_effect = [
     814                  (host, 80) for host in unique_hosts]
     815          else:
     816              mock_sock.socket().getsockbyname.side_effect = [
     817                  (host, 80, 0, 0) for host in unique_hosts]
     818          self.loop.getaddrinfo = getaddrinfo_task
     819          self.loop._start_serving = mock.Mock()
     820          self.loop._stop_serving = mock.Mock()
     821          f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
     822          server = self.loop.run_until_complete(f)
     823          self.addCleanup(server.close)
     824          server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
     825          self.assertEqual(server_hosts, unique_hosts)
     826  
     827      def test_create_server_multiple_hosts_ipv4(self):
     828          self.create_server_multiple_hosts(socket.AF_INET,
     829                                            ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
     830  
     831      def test_create_server_multiple_hosts_ipv6(self):
     832          self.create_server_multiple_hosts(socket.AF_INET6,
     833                                            ['::1', '::2', '::1'])
     834  
     835      def test_create_server(self):
     836          proto = MyProto(self.loop)
     837          f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
     838          server = self.loop.run_until_complete(f)
     839          self.assertEqual(len(server.sockets), 1)
     840          sock = server.sockets[0]
     841          host, port = sock.getsockname()
     842          self.assertEqual(host, '0.0.0.0')
     843          client = socket.socket()
     844          client.connect(('127.0.0.1', port))
     845          client.sendall(b'xxx')
     846  
     847          self.loop.run_until_complete(proto.connected)
     848          self.assertEqual('CONNECTED', proto.state)
     849  
     850          test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
     851          self.assertEqual(3, proto.nbytes)
     852  
     853          # extra info is available
     854          self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
     855          self.assertEqual('127.0.0.1',
     856                           proto.transport.get_extra_info('peername')[0])
     857  
     858          # close connection
     859          proto.transport.close()
     860          self.loop.run_until_complete(proto.done)
     861  
     862          self.assertEqual('CLOSED', proto.state)
     863  
     864          # the client socket must be closed after to avoid ECONNRESET upon
     865          # recv()/send() on the serving socket
     866          client.close()
     867  
     868          # close server
     869          server.close()
     870  
     871      def test_create_server_trsock(self):
     872          proto = MyProto(self.loop)
     873          f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
     874          server = self.loop.run_until_complete(f)
     875          self.assertEqual(len(server.sockets), 1)
     876          sock = server.sockets[0]
     877          self.assertIsInstance(sock, asyncio.trsock.TransportSocket)
     878          host, port = sock.getsockname()
     879          self.assertEqual(host, '0.0.0.0')
     880          dup = sock.dup()
     881          self.addCleanup(dup.close)
     882          self.assertIsInstance(dup, socket.socket)
     883          self.assertFalse(sock.get_inheritable())
     884          with self.assertRaises(ValueError):
     885              sock.settimeout(1)
     886          sock.settimeout(0)
     887          self.assertEqual(sock.gettimeout(), 0)
     888          with self.assertRaises(ValueError):
     889              sock.setblocking(True)
     890          sock.setblocking(False)
     891          server.close()
     892  
     893  
     894      @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
     895      def test_create_server_reuse_port(self):
     896          proto = MyProto(self.loop)
     897          f = self.loop.create_server(
     898              lambda: proto, '0.0.0.0', 0)
     899          server = self.loop.run_until_complete(f)
     900          self.assertEqual(len(server.sockets), 1)
     901          sock = server.sockets[0]
     902          self.assertFalse(
     903              sock.getsockopt(
     904                  socket.SOL_SOCKET, socket.SO_REUSEPORT))
     905          server.close()
     906  
     907          test_utils.run_briefly(self.loop)
     908  
     909          proto = MyProto(self.loop)
     910          f = self.loop.create_server(
     911              lambda: proto, '0.0.0.0', 0, reuse_port=True)
     912          server = self.loop.run_until_complete(f)
     913          self.assertEqual(len(server.sockets), 1)
     914          sock = server.sockets[0]
     915          self.assertTrue(
     916              sock.getsockopt(
     917                  socket.SOL_SOCKET, socket.SO_REUSEPORT))
     918          server.close()
     919  
     920      def _make_unix_server(self, factory, **kwargs):
     921          path = test_utils.gen_unix_socket_path()
     922          self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
     923  
     924          f = self.loop.create_unix_server(factory, path, **kwargs)
     925          server = self.loop.run_until_complete(f)
     926  
     927          return server, path
     928  
     929      @socket_helper.skip_unless_bind_unix_socket
     930      def test_create_unix_server(self):
     931          proto = MyProto(loop=self.loop)
     932          server, path = self._make_unix_server(lambda: proto)
     933          self.assertEqual(len(server.sockets), 1)
     934  
     935          client = socket.socket(socket.AF_UNIX)
     936          client.connect(path)
     937          client.sendall(b'xxx')
     938  
     939          self.loop.run_until_complete(proto.connected)
     940          self.assertEqual('CONNECTED', proto.state)
     941          test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
     942          self.assertEqual(3, proto.nbytes)
     943  
     944          # close connection
     945          proto.transport.close()
     946          self.loop.run_until_complete(proto.done)
     947  
     948          self.assertEqual('CLOSED', proto.state)
     949  
     950          # the client socket must be closed after to avoid ECONNRESET upon
     951          # recv()/send() on the serving socket
     952          client.close()
     953  
     954          # close server
     955          server.close()
     956  
     957      @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
     958      def test_create_unix_server_path_socket_error(self):
     959          proto = MyProto(loop=self.loop)
     960          sock = socket.socket()
     961          with sock:
     962              f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
     963              with self.assertRaisesRegex(ValueError,
     964                                          'path and sock can not be specified '
     965                                          'at the same time'):
     966                  self.loop.run_until_complete(f)
     967  
     968      def _create_ssl_context(self, certfile, keyfile=None):
     969          sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     970          sslcontext.options |= ssl.OP_NO_SSLv2
     971          sslcontext.load_cert_chain(certfile, keyfile)
     972          return sslcontext
     973  
     974      def _make_ssl_server(self, factory, certfile, keyfile=None):
     975          sslcontext = self._create_ssl_context(certfile, keyfile)
     976  
     977          f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
     978          server = self.loop.run_until_complete(f)
     979  
     980          sock = server.sockets[0]
     981          host, port = sock.getsockname()
     982          self.assertEqual(host, '127.0.0.1')
     983          return server, host, port
     984  
     985      def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
     986          sslcontext = self._create_ssl_context(certfile, keyfile)
     987          return self._make_unix_server(factory, ssl=sslcontext)
     988  
     989      @unittest.skipIf(ssl is None, 'No ssl module')
     990      def test_create_server_ssl(self):
     991          proto = MyProto(loop=self.loop)
     992          server, host, port = self._make_ssl_server(
     993              lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
     994  
     995          f_c = self.loop.create_connection(MyBaseProto, host, port,
     996                                            ssl=test_utils.dummy_ssl_context())
     997          client, pr = self.loop.run_until_complete(f_c)
     998  
     999          client.write(b'xxx')
    1000          self.loop.run_until_complete(proto.connected)
    1001          self.assertEqual('CONNECTED', proto.state)
    1002  
    1003          test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
    1004          self.assertEqual(3, proto.nbytes)
    1005  
    1006          # extra info is available
    1007          self.check_ssl_extra_info(client, peername=(host, port))
    1008  
    1009          # close connection
    1010          proto.transport.close()
    1011          self.loop.run_until_complete(proto.done)
    1012          self.assertEqual('CLOSED', proto.state)
    1013  
    1014          # the client socket must be closed after to avoid ECONNRESET upon
    1015          # recv()/send() on the serving socket
    1016          client.close()
    1017  
    1018          # stop serving
    1019          server.close()
    1020  
    1021      @socket_helper.skip_unless_bind_unix_socket
    1022      @unittest.skipIf(ssl is None, 'No ssl module')
    1023      def test_create_unix_server_ssl(self):
    1024          proto = MyProto(loop=self.loop)
    1025          server, path = self._make_ssl_unix_server(
    1026              lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
    1027  
    1028          f_c = self.loop.create_unix_connection(
    1029              MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
    1030              server_hostname='')
    1031  
    1032          client, pr = self.loop.run_until_complete(f_c)
    1033  
    1034          client.write(b'xxx')
    1035          self.loop.run_until_complete(proto.connected)
    1036          self.assertEqual('CONNECTED', proto.state)
    1037          test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
    1038          self.assertEqual(3, proto.nbytes)
    1039  
    1040          # close connection
    1041          proto.transport.close()
    1042          self.loop.run_until_complete(proto.done)
    1043          self.assertEqual('CLOSED', proto.state)
    1044  
    1045          # the client socket must be closed after to avoid ECONNRESET upon
    1046          # recv()/send() on the serving socket
    1047          client.close()
    1048  
    1049          # stop serving
    1050          server.close()
    1051  
    1052      @unittest.skipIf(ssl is None, 'No ssl module')
    1053      def test_create_server_ssl_verify_failed(self):
    1054          proto = MyProto(loop=self.loop)
    1055          server, host, port = self._make_ssl_server(
    1056              lambda: proto, test_utils.SIGNED_CERTFILE)
    1057  
    1058          sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1059          sslcontext_client.options |= ssl.OP_NO_SSLv2
    1060          sslcontext_client.verify_mode = ssl.CERT_REQUIRED
    1061          if hasattr(sslcontext_client, 'check_hostname'):
    1062              sslcontext_client.check_hostname = True
    1063  
    1064  
    1065          # no CA loaded
    1066          f_c = self.loop.create_connection(MyProto, host, port,
    1067                                            ssl=sslcontext_client)
    1068          with mock.patch.object(self.loop, 'call_exception_handler'):
    1069              with test_utils.disable_logger():
    1070                  with self.assertRaisesRegex(ssl.SSLError,
    1071                                              '(?i)certificate.verify.failed'):
    1072                      self.loop.run_until_complete(f_c)
    1073  
    1074              # execute the loop to log the connection error
    1075              test_utils.run_briefly(self.loop)
    1076  
    1077          # close connection
    1078          self.assertIsNone(proto.transport)
    1079          server.close()
    1080  
    1081      @socket_helper.skip_unless_bind_unix_socket
    1082      @unittest.skipIf(ssl is None, 'No ssl module')
    1083      def test_create_unix_server_ssl_verify_failed(self):
    1084          proto = MyProto(loop=self.loop)
    1085          server, path = self._make_ssl_unix_server(
    1086              lambda: proto, test_utils.SIGNED_CERTFILE)
    1087  
    1088          sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1089          sslcontext_client.options |= ssl.OP_NO_SSLv2
    1090          sslcontext_client.verify_mode = ssl.CERT_REQUIRED
    1091          if hasattr(sslcontext_client, 'check_hostname'):
    1092              sslcontext_client.check_hostname = True
    1093  
    1094          # no CA loaded
    1095          f_c = self.loop.create_unix_connection(MyProto, path,
    1096                                                 ssl=sslcontext_client,
    1097                                                 server_hostname='invalid')
    1098          with mock.patch.object(self.loop, 'call_exception_handler'):
    1099              with test_utils.disable_logger():
    1100                  with self.assertRaisesRegex(ssl.SSLError,
    1101                                              '(?i)certificate.verify.failed'):
    1102                      self.loop.run_until_complete(f_c)
    1103  
    1104              # execute the loop to log the connection error
    1105              test_utils.run_briefly(self.loop)
    1106  
    1107          # close connection
    1108          self.assertIsNone(proto.transport)
    1109          server.close()
    1110  
    1111      @unittest.skipIf(ssl is None, 'No ssl module')
    1112      def test_create_server_ssl_match_failed(self):
    1113          proto = MyProto(loop=self.loop)
    1114          server, host, port = self._make_ssl_server(
    1115              lambda: proto, test_utils.SIGNED_CERTFILE)
    1116  
    1117          sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1118          sslcontext_client.options |= ssl.OP_NO_SSLv2
    1119          sslcontext_client.verify_mode = ssl.CERT_REQUIRED
    1120          sslcontext_client.load_verify_locations(
    1121              cafile=test_utils.SIGNING_CA)
    1122          if hasattr(sslcontext_client, 'check_hostname'):
    1123              sslcontext_client.check_hostname = True
    1124  
    1125          # incorrect server_hostname
    1126          f_c = self.loop.create_connection(MyProto, host, port,
    1127                                            ssl=sslcontext_client)
    1128          with mock.patch.object(self.loop, 'call_exception_handler'):
    1129              with test_utils.disable_logger():
    1130                  with self.assertRaisesRegex(
    1131                          ssl.CertificateError,
    1132                          "IP address mismatch, certificate is not valid for "
    1133                          "'127.0.0.1'"):
    1134                      self.loop.run_until_complete(f_c)
    1135  
    1136          # close connection
    1137          # transport is None because TLS ALERT aborted the handshake
    1138          self.assertIsNone(proto.transport)
    1139          server.close()
    1140  
    1141      @socket_helper.skip_unless_bind_unix_socket
    1142      @unittest.skipIf(ssl is None, 'No ssl module')
    1143      def test_create_unix_server_ssl_verified(self):
    1144          proto = MyProto(loop=self.loop)
    1145          server, path = self._make_ssl_unix_server(
    1146              lambda: proto, test_utils.SIGNED_CERTFILE)
    1147  
    1148          sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1149          sslcontext_client.options |= ssl.OP_NO_SSLv2
    1150          sslcontext_client.verify_mode = ssl.CERT_REQUIRED
    1151          sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
    1152          if hasattr(sslcontext_client, 'check_hostname'):
    1153              sslcontext_client.check_hostname = True
    1154  
    1155          # Connection succeeds with correct CA and server hostname.
    1156          f_c = self.loop.create_unix_connection(MyProto, path,
    1157                                                 ssl=sslcontext_client,
    1158                                                 server_hostname='localhost')
    1159          client, pr = self.loop.run_until_complete(f_c)
    1160          self.loop.run_until_complete(proto.connected)
    1161  
    1162          # close connection
    1163          proto.transport.close()
    1164          client.close()
    1165          server.close()
    1166          self.loop.run_until_complete(proto.done)
    1167  
    1168      @unittest.skipIf(ssl is None, 'No ssl module')
    1169      def test_create_server_ssl_verified(self):
    1170          proto = MyProto(loop=self.loop)
    1171          server, host, port = self._make_ssl_server(
    1172              lambda: proto, test_utils.SIGNED_CERTFILE)
    1173  
    1174          sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1175          sslcontext_client.options |= ssl.OP_NO_SSLv2
    1176          sslcontext_client.verify_mode = ssl.CERT_REQUIRED
    1177          sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
    1178          if hasattr(sslcontext_client, 'check_hostname'):
    1179              sslcontext_client.check_hostname = True
    1180  
    1181          # Connection succeeds with correct CA and server hostname.
    1182          f_c = self.loop.create_connection(MyProto, host, port,
    1183                                            ssl=sslcontext_client,
    1184                                            server_hostname='localhost')
    1185          client, pr = self.loop.run_until_complete(f_c)
    1186          self.loop.run_until_complete(proto.connected)
    1187  
    1188          # extra info is available
    1189          self.check_ssl_extra_info(client, peername=(host, port),
    1190                                    peercert=test_utils.PEERCERT)
    1191  
    1192          # close connection
    1193          proto.transport.close()
    1194          client.close()
    1195          server.close()
    1196          self.loop.run_until_complete(proto.done)
    1197  
    1198      def test_create_server_sock(self):
    1199          proto = self.loop.create_future()
    1200  
    1201          class ESC[4;38;5;81mTestMyProto(ESC[4;38;5;149mMyProto):
    1202              def connection_made(self, transport):
    1203                  super().connection_made(transport)
    1204                  proto.set_result(self)
    1205  
    1206          sock_ob = socket.create_server(('0.0.0.0', 0))
    1207  
    1208          f = self.loop.create_server(TestMyProto, sock=sock_ob)
    1209          server = self.loop.run_until_complete(f)
    1210          sock = server.sockets[0]
    1211          self.assertEqual(sock.fileno(), sock_ob.fileno())
    1212  
    1213          host, port = sock.getsockname()
    1214          self.assertEqual(host, '0.0.0.0')
    1215          client = socket.socket()
    1216          client.connect(('127.0.0.1', port))
    1217          client.send(b'xxx')
    1218          client.close()
    1219          server.close()
    1220  
    1221      def test_create_server_addr_in_use(self):
    1222          sock_ob = socket.create_server(('0.0.0.0', 0))
    1223  
    1224          f = self.loop.create_server(MyProto, sock=sock_ob)
    1225          server = self.loop.run_until_complete(f)
    1226          sock = server.sockets[0]
    1227          host, port = sock.getsockname()
    1228  
    1229          f = self.loop.create_server(MyProto, host=host, port=port)
    1230          with self.assertRaises(OSError) as cm:
    1231              self.loop.run_until_complete(f)
    1232          self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
    1233  
    1234          server.close()
    1235  
    1236      @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
    1237      def test_create_server_dual_stack(self):
    1238          f_proto = self.loop.create_future()
    1239  
    1240          class ESC[4;38;5;81mTestMyProto(ESC[4;38;5;149mMyProto):
    1241              def connection_made(self, transport):
    1242                  super().connection_made(transport)
    1243                  f_proto.set_result(self)
    1244  
    1245          try_count = 0
    1246          while True:
    1247              try:
    1248                  port = socket_helper.find_unused_port()
    1249                  f = self.loop.create_server(TestMyProto, host=None, port=port)
    1250                  server = self.loop.run_until_complete(f)
    1251              except OSError as ex:
    1252                  if ex.errno == errno.EADDRINUSE:
    1253                      try_count += 1
    1254                      self.assertGreaterEqual(5, try_count)
    1255                      continue
    1256                  else:
    1257                      raise
    1258              else:
    1259                  break
    1260          client = socket.socket()
    1261          client.connect(('127.0.0.1', port))
    1262          client.send(b'xxx')
    1263          proto = self.loop.run_until_complete(f_proto)
    1264          proto.transport.close()
    1265          client.close()
    1266  
    1267          f_proto = self.loop.create_future()
    1268          client = socket.socket(socket.AF_INET6)
    1269          client.connect(('::1', port))
    1270          client.send(b'xxx')
    1271          proto = self.loop.run_until_complete(f_proto)
    1272          proto.transport.close()
    1273          client.close()
    1274  
    1275          server.close()
    1276  
    1277      @socket_helper.skip_if_tcp_blackhole
    1278      def test_server_close(self):
    1279          f = self.loop.create_server(MyProto, '0.0.0.0', 0)
    1280          server = self.loop.run_until_complete(f)
    1281          sock = server.sockets[0]
    1282          host, port = sock.getsockname()
    1283  
    1284          client = socket.socket()
    1285          client.connect(('127.0.0.1', port))
    1286          client.send(b'xxx')
    1287          client.close()
    1288  
    1289          server.close()
    1290  
    1291          client = socket.socket()
    1292          self.assertRaises(
    1293              ConnectionRefusedError, client.connect, ('127.0.0.1', port))
    1294          client.close()
    1295  
    1296      def _test_create_datagram_endpoint(self, local_addr, family):
    1297          class ESC[4;38;5;81mTestMyDatagramProto(ESC[4;38;5;149mMyDatagramProto):
    1298              def __init__(inner_self):
    1299                  super().__init__(loop=self.loop)
    1300  
    1301              def datagram_received(self, data, addr):
    1302                  super().datagram_received(data, addr)
    1303                  self.transport.sendto(b'resp:'+data, addr)
    1304  
    1305          coro = self.loop.create_datagram_endpoint(
    1306              TestMyDatagramProto, local_addr=local_addr, family=family)
    1307          s_transport, server = self.loop.run_until_complete(coro)
    1308          sockname = s_transport.get_extra_info('sockname')
    1309          host, port = socket.getnameinfo(
    1310              sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
    1311  
    1312          self.assertIsInstance(s_transport, asyncio.Transport)
    1313          self.assertIsInstance(server, TestMyDatagramProto)
    1314          self.assertEqual('INITIALIZED', server.state)
    1315          self.assertIs(server.transport, s_transport)
    1316  
    1317          coro = self.loop.create_datagram_endpoint(
    1318              lambda: MyDatagramProto(loop=self.loop),
    1319              remote_addr=(host, port))
    1320          transport, client = self.loop.run_until_complete(coro)
    1321  
    1322          self.assertIsInstance(transport, asyncio.Transport)
    1323          self.assertIsInstance(client, MyDatagramProto)
    1324          self.assertEqual('INITIALIZED', client.state)
    1325          self.assertIs(client.transport, transport)
    1326  
    1327          transport.sendto(b'xxx')
    1328          test_utils.run_until(self.loop, lambda: server.nbytes)
    1329          self.assertEqual(3, server.nbytes)
    1330          test_utils.run_until(self.loop, lambda: client.nbytes)
    1331  
    1332          # received
    1333          self.assertEqual(8, client.nbytes)
    1334  
    1335          # extra info is available
    1336          self.assertIsNotNone(transport.get_extra_info('sockname'))
    1337  
    1338          # close connection
    1339          transport.close()
    1340          self.loop.run_until_complete(client.done)
    1341          self.assertEqual('CLOSED', client.state)
    1342          server.transport.close()
    1343  
    1344      def test_create_datagram_endpoint(self):
    1345          self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
    1346  
    1347      @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
    1348      def test_create_datagram_endpoint_ipv6(self):
    1349          self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
    1350  
    1351      def test_create_datagram_endpoint_sock(self):
    1352          sock = None
    1353          local_address = ('127.0.0.1', 0)
    1354          infos = self.loop.run_until_complete(
    1355              self.loop.getaddrinfo(
    1356                  *local_address, type=socket.SOCK_DGRAM))
    1357          for family, type, proto, cname, address in infos:
    1358              try:
    1359                  sock = socket.socket(family=family, type=type, proto=proto)
    1360                  sock.setblocking(False)
    1361                  sock.bind(address)
    1362              except:
    1363                  pass
    1364              else:
    1365                  break
    1366          else:
    1367              self.fail('Can not create socket.')
    1368  
    1369          f = self.loop.create_datagram_endpoint(
    1370              lambda: MyDatagramProto(loop=self.loop), sock=sock)
    1371          tr, pr = self.loop.run_until_complete(f)
    1372          self.assertIsInstance(tr, asyncio.Transport)
    1373          self.assertIsInstance(pr, MyDatagramProto)
    1374          tr.close()
    1375          self.loop.run_until_complete(pr.done)
    1376  
    1377      def test_internal_fds(self):
    1378          loop = self.create_event_loop()
    1379          if not isinstance(loop, selector_events.BaseSelectorEventLoop):
    1380              loop.close()
    1381              self.skipTest('loop is not a BaseSelectorEventLoop')
    1382  
    1383          self.assertEqual(1, loop._internal_fds)
    1384          loop.close()
    1385          self.assertEqual(0, loop._internal_fds)
    1386          self.assertIsNone(loop._csock)
    1387          self.assertIsNone(loop._ssock)
    1388  
    1389      @unittest.skipUnless(sys.platform != 'win32',
    1390                           "Don't support pipes for Windows")
    1391      def test_read_pipe(self):
    1392          proto = MyReadPipeProto(loop=self.loop)
    1393  
    1394          rpipe, wpipe = os.pipe()
    1395          pipeobj = io.open(rpipe, 'rb', 1024)
    1396  
    1397          async def connect():
    1398              t, p = await self.loop.connect_read_pipe(
    1399                  lambda: proto, pipeobj)
    1400              self.assertIs(p, proto)
    1401              self.assertIs(t, proto.transport)
    1402              self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
    1403              self.assertEqual(0, proto.nbytes)
    1404  
    1405          self.loop.run_until_complete(connect())
    1406  
    1407          os.write(wpipe, b'1')
    1408          test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
    1409          self.assertEqual(1, proto.nbytes)
    1410  
    1411          os.write(wpipe, b'2345')
    1412          test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
    1413          self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
    1414          self.assertEqual(5, proto.nbytes)
    1415  
    1416          os.close(wpipe)
    1417          self.loop.run_until_complete(proto.done)
    1418          self.assertEqual(
    1419              ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
    1420          # extra info is available
    1421          self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
    1422  
    1423      @unittest.skipUnless(sys.platform != 'win32',
    1424                           "Don't support pipes for Windows")
    1425      def test_unclosed_pipe_transport(self):
    1426          # This test reproduces the issue #314 on GitHub
    1427          loop = self.create_event_loop()
    1428          read_proto = MyReadPipeProto(loop=loop)
    1429          write_proto = MyWritePipeProto(loop=loop)
    1430  
    1431          rpipe, wpipe = os.pipe()
    1432          rpipeobj = io.open(rpipe, 'rb', 1024)
    1433          wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8")
    1434  
    1435          async def connect():
    1436              read_transport, _ = await loop.connect_read_pipe(
    1437                  lambda: read_proto, rpipeobj)
    1438              write_transport, _ = await loop.connect_write_pipe(
    1439                  lambda: write_proto, wpipeobj)
    1440              return read_transport, write_transport
    1441  
    1442          # Run and close the loop without closing the transports
    1443          read_transport, write_transport = loop.run_until_complete(connect())
    1444          loop.close()
    1445  
    1446          # These 'repr' calls used to raise an AttributeError
    1447          # See Issue #314 on GitHub
    1448          self.assertIn('open', repr(read_transport))
    1449          self.assertIn('open', repr(write_transport))
    1450  
    1451          # Clean up (avoid ResourceWarning)
    1452          rpipeobj.close()
    1453          wpipeobj.close()
    1454          read_transport._pipe = None
    1455          write_transport._pipe = None
    1456  
    1457      @unittest.skipUnless(sys.platform != 'win32',
    1458                           "Don't support pipes for Windows")
    1459      @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
    1460      def test_read_pty_output(self):
    1461          proto = MyReadPipeProto(loop=self.loop)
    1462  
    1463          master, slave = os.openpty()
    1464          master_read_obj = io.open(master, 'rb', 0)
    1465  
    1466          async def connect():
    1467              t, p = await self.loop.connect_read_pipe(lambda: proto,
    1468                                                       master_read_obj)
    1469              self.assertIs(p, proto)
    1470              self.assertIs(t, proto.transport)
    1471              self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
    1472              self.assertEqual(0, proto.nbytes)
    1473  
    1474          self.loop.run_until_complete(connect())
    1475  
    1476          os.write(slave, b'1')
    1477          test_utils.run_until(self.loop, lambda: proto.nbytes)
    1478          self.assertEqual(1, proto.nbytes)
    1479  
    1480          os.write(slave, b'2345')
    1481          test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
    1482          self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
    1483          self.assertEqual(5, proto.nbytes)
    1484  
    1485          os.close(slave)
    1486          proto.transport.close()
    1487          self.loop.run_until_complete(proto.done)
    1488          self.assertEqual(
    1489              ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
    1490          # extra info is available
    1491          self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
    1492  
    1493      @unittest.skipUnless(sys.platform != 'win32',
    1494                           "Don't support pipes for Windows")
    1495      def test_write_pipe(self):
    1496          rpipe, wpipe = os.pipe()
    1497          pipeobj = io.open(wpipe, 'wb', 1024)
    1498  
    1499          proto = MyWritePipeProto(loop=self.loop)
    1500          connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
    1501          transport, p = self.loop.run_until_complete(connect)
    1502          self.assertIs(p, proto)
    1503          self.assertIs(transport, proto.transport)
    1504          self.assertEqual('CONNECTED', proto.state)
    1505  
    1506          transport.write(b'1')
    1507  
    1508          data = bytearray()
    1509          def reader(data):
    1510              chunk = os.read(rpipe, 1024)
    1511              data += chunk
    1512              return len(data)
    1513  
    1514          test_utils.run_until(self.loop, lambda: reader(data) >= 1)
    1515          self.assertEqual(b'1', data)
    1516  
    1517          transport.write(b'2345')
    1518          test_utils.run_until(self.loop, lambda: reader(data) >= 5)
    1519          self.assertEqual(b'12345', data)
    1520          self.assertEqual('CONNECTED', proto.state)
    1521  
    1522          os.close(rpipe)
    1523  
    1524          # extra info is available
    1525          self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
    1526  
    1527          # close connection
    1528          proto.transport.close()
    1529          self.loop.run_until_complete(proto.done)
    1530          self.assertEqual('CLOSED', proto.state)
    1531  
    1532      @unittest.skipUnless(sys.platform != 'win32',
    1533                           "Don't support pipes for Windows")
    1534      def test_write_pipe_disconnect_on_close(self):
    1535          rsock, wsock = socket.socketpair()
    1536          rsock.setblocking(False)
    1537          pipeobj = io.open(wsock.detach(), 'wb', 1024)
    1538  
    1539          proto = MyWritePipeProto(loop=self.loop)
    1540          connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
    1541          transport, p = self.loop.run_until_complete(connect)
    1542          self.assertIs(p, proto)
    1543          self.assertIs(transport, proto.transport)
    1544          self.assertEqual('CONNECTED', proto.state)
    1545  
    1546          transport.write(b'1')
    1547          data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
    1548          self.assertEqual(b'1', data)
    1549  
    1550          rsock.close()
    1551  
    1552          self.loop.run_until_complete(proto.done)
    1553          self.assertEqual('CLOSED', proto.state)
    1554  
    1555      @unittest.skipUnless(sys.platform != 'win32',
    1556                           "Don't support pipes for Windows")
    1557      @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
    1558      # select, poll and kqueue don't support character devices (PTY) on Mac OS X
    1559      # older than 10.6 (Snow Leopard)
    1560      @support.requires_mac_ver(10, 6)
    1561      def test_write_pty(self):
    1562          master, slave = os.openpty()
    1563          slave_write_obj = io.open(slave, 'wb', 0)
    1564  
    1565          proto = MyWritePipeProto(loop=self.loop)
    1566          connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
    1567          transport, p = self.loop.run_until_complete(connect)
    1568          self.assertIs(p, proto)
    1569          self.assertIs(transport, proto.transport)
    1570          self.assertEqual('CONNECTED', proto.state)
    1571  
    1572          transport.write(b'1')
    1573  
    1574          data = bytearray()
    1575          def reader(data):
    1576              chunk = os.read(master, 1024)
    1577              data += chunk
    1578              return len(data)
    1579  
    1580          test_utils.run_until(self.loop, lambda: reader(data) >= 1,
    1581                               timeout=support.SHORT_TIMEOUT)
    1582          self.assertEqual(b'1', data)
    1583  
    1584          transport.write(b'2345')
    1585          test_utils.run_until(self.loop, lambda: reader(data) >= 5,
    1586                               timeout=support.SHORT_TIMEOUT)
    1587          self.assertEqual(b'12345', data)
    1588          self.assertEqual('CONNECTED', proto.state)
    1589  
    1590          os.close(master)
    1591  
    1592          # extra info is available
    1593          self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
    1594  
    1595          # close connection
    1596          proto.transport.close()
    1597          self.loop.run_until_complete(proto.done)
    1598          self.assertEqual('CLOSED', proto.state)
    1599  
    1600      @unittest.skipUnless(sys.platform != 'win32',
    1601                           "Don't support pipes for Windows")
    1602      @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
    1603      # select, poll and kqueue don't support character devices (PTY) on Mac OS X
    1604      # older than 10.6 (Snow Leopard)
    1605      @support.requires_mac_ver(10, 6)
    1606      def test_bidirectional_pty(self):
    1607          master, read_slave = os.openpty()
    1608          write_slave = os.dup(read_slave)
    1609          tty.setraw(read_slave)
    1610  
    1611          slave_read_obj = io.open(read_slave, 'rb', 0)
    1612          read_proto = MyReadPipeProto(loop=self.loop)
    1613          read_connect = self.loop.connect_read_pipe(lambda: read_proto,
    1614                                                     slave_read_obj)
    1615          read_transport, p = self.loop.run_until_complete(read_connect)
    1616          self.assertIs(p, read_proto)
    1617          self.assertIs(read_transport, read_proto.transport)
    1618          self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
    1619          self.assertEqual(0, read_proto.nbytes)
    1620  
    1621  
    1622          slave_write_obj = io.open(write_slave, 'wb', 0)
    1623          write_proto = MyWritePipeProto(loop=self.loop)
    1624          write_connect = self.loop.connect_write_pipe(lambda: write_proto,
    1625                                                       slave_write_obj)
    1626          write_transport, p = self.loop.run_until_complete(write_connect)
    1627          self.assertIs(p, write_proto)
    1628          self.assertIs(write_transport, write_proto.transport)
    1629          self.assertEqual('CONNECTED', write_proto.state)
    1630  
    1631          data = bytearray()
    1632          def reader(data):
    1633              chunk = os.read(master, 1024)
    1634              data += chunk
    1635              return len(data)
    1636  
    1637          write_transport.write(b'1')
    1638          test_utils.run_until(self.loop, lambda: reader(data) >= 1,
    1639                               timeout=support.SHORT_TIMEOUT)
    1640          self.assertEqual(b'1', data)
    1641          self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
    1642          self.assertEqual('CONNECTED', write_proto.state)
    1643  
    1644          os.write(master, b'a')
    1645          test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
    1646                               timeout=support.SHORT_TIMEOUT)
    1647          self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
    1648          self.assertEqual(1, read_proto.nbytes)
    1649          self.assertEqual('CONNECTED', write_proto.state)
    1650  
    1651          write_transport.write(b'2345')
    1652          test_utils.run_until(self.loop, lambda: reader(data) >= 5,
    1653                               timeout=support.SHORT_TIMEOUT)
    1654          self.assertEqual(b'12345', data)
    1655          self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
    1656          self.assertEqual('CONNECTED', write_proto.state)
    1657  
    1658          os.write(master, b'bcde')
    1659          test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
    1660                               timeout=support.SHORT_TIMEOUT)
    1661          self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
    1662          self.assertEqual(5, read_proto.nbytes)
    1663          self.assertEqual('CONNECTED', write_proto.state)
    1664  
    1665          os.close(master)
    1666  
    1667          read_transport.close()
    1668          self.loop.run_until_complete(read_proto.done)
    1669          self.assertEqual(
    1670              ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
    1671  
    1672          write_transport.close()
    1673          self.loop.run_until_complete(write_proto.done)
    1674          self.assertEqual('CLOSED', write_proto.state)
    1675  
    1676      def test_prompt_cancellation(self):
    1677          r, w = socket.socketpair()
    1678          r.setblocking(False)
    1679          f = self.loop.create_task(self.loop.sock_recv(r, 1))
    1680          ov = getattr(f, 'ov', None)
    1681          if ov is not None:
    1682              self.assertTrue(ov.pending)
    1683  
    1684          async def main():
    1685              try:
    1686                  self.loop.call_soon(f.cancel)
    1687                  await f
    1688              except asyncio.CancelledError:
    1689                  res = 'cancelled'
    1690              else:
    1691                  res = None
    1692              finally:
    1693                  self.loop.stop()
    1694              return res
    1695  
    1696          start = time.monotonic()
    1697          t = self.loop.create_task(main())
    1698          self.loop.run_forever()
    1699          elapsed = time.monotonic() - start
    1700  
    1701          self.assertLess(elapsed, 0.1)
    1702          self.assertEqual(t.result(), 'cancelled')
    1703          self.assertRaises(asyncio.CancelledError, f.result)
    1704          if ov is not None:
    1705              self.assertFalse(ov.pending)
    1706          self.loop._stop_serving(r)
    1707  
    1708          r.close()
    1709          w.close()
    1710  
    1711      def test_timeout_rounding(self):
    1712          def _run_once():
    1713              self.loop._run_once_counter += 1
    1714              orig_run_once()
    1715  
    1716          orig_run_once = self.loop._run_once
    1717          self.loop._run_once_counter = 0
    1718          self.loop._run_once = _run_once
    1719  
    1720          async def wait():
    1721              loop = self.loop
    1722              await asyncio.sleep(1e-2)
    1723              await asyncio.sleep(1e-4)
    1724              await asyncio.sleep(1e-6)
    1725              await asyncio.sleep(1e-8)
    1726              await asyncio.sleep(1e-10)
    1727  
    1728          self.loop.run_until_complete(wait())
    1729          # The ideal number of call is 12, but on some platforms, the selector
    1730          # may sleep at little bit less than timeout depending on the resolution
    1731          # of the clock used by the kernel. Tolerate a few useless calls on
    1732          # these platforms.
    1733          self.assertLessEqual(self.loop._run_once_counter, 20,
    1734              {'clock_resolution': self.loop._clock_resolution,
    1735               'selector': self.loop._selector.__class__.__name__})
    1736  
    1737      def test_remove_fds_after_closing(self):
    1738          loop = self.create_event_loop()
    1739          callback = lambda: None
    1740          r, w = socket.socketpair()
    1741          self.addCleanup(r.close)
    1742          self.addCleanup(w.close)
    1743          loop.add_reader(r, callback)
    1744          loop.add_writer(w, callback)
    1745          loop.close()
    1746          self.assertFalse(loop.remove_reader(r))
    1747          self.assertFalse(loop.remove_writer(w))
    1748  
    1749      def test_add_fds_after_closing(self):
    1750          loop = self.create_event_loop()
    1751          callback = lambda: None
    1752          r, w = socket.socketpair()
    1753          self.addCleanup(r.close)
    1754          self.addCleanup(w.close)
    1755          loop.close()
    1756          with self.assertRaises(RuntimeError):
    1757              loop.add_reader(r, callback)
    1758          with self.assertRaises(RuntimeError):
    1759              loop.add_writer(w, callback)
    1760  
    1761      def test_close_running_event_loop(self):
    1762          async def close_loop(loop):
    1763              self.loop.close()
    1764  
    1765          coro = close_loop(self.loop)
    1766          with self.assertRaises(RuntimeError):
    1767              self.loop.run_until_complete(coro)
    1768  
    1769      def test_close(self):
    1770          self.loop.close()
    1771  
    1772          async def test():
    1773              pass
    1774  
    1775          func = lambda: False
    1776          coro = test()
    1777          self.addCleanup(coro.close)
    1778  
    1779          # operation blocked when the loop is closed
    1780          with self.assertRaises(RuntimeError):
    1781              self.loop.run_forever()
    1782          with self.assertRaises(RuntimeError):
    1783              fut = self.loop.create_future()
    1784              self.loop.run_until_complete(fut)
    1785          with self.assertRaises(RuntimeError):
    1786              self.loop.call_soon(func)
    1787          with self.assertRaises(RuntimeError):
    1788              self.loop.call_soon_threadsafe(func)
    1789          with self.assertRaises(RuntimeError):
    1790              self.loop.call_later(1.0, func)
    1791          with self.assertRaises(RuntimeError):
    1792              self.loop.call_at(self.loop.time() + .0, func)
    1793          with self.assertRaises(RuntimeError):
    1794              self.loop.create_task(coro)
    1795          with self.assertRaises(RuntimeError):
    1796              self.loop.add_signal_handler(signal.SIGTERM, func)
    1797  
    1798          # run_in_executor test is tricky: the method is a coroutine,
    1799          # but run_until_complete cannot be called on closed loop.
    1800          # Thus iterate once explicitly.
    1801          with self.assertRaises(RuntimeError):
    1802              it = self.loop.run_in_executor(None, func).__await__()
    1803              next(it)
    1804  
    1805  
    1806  class ESC[4;38;5;81mSubprocessTestsMixin:
    1807  
    1808      def check_terminated(self, returncode):
    1809          if sys.platform == 'win32':
    1810              self.assertIsInstance(returncode, int)
    1811              # expect 1 but sometimes get 0
    1812          else:
    1813              self.assertEqual(-signal.SIGTERM, returncode)
    1814  
    1815      def check_killed(self, returncode):
    1816          if sys.platform == 'win32':
    1817              self.assertIsInstance(returncode, int)
    1818              # expect 1 but sometimes get 0
    1819          else:
    1820              self.assertEqual(-signal.SIGKILL, returncode)
    1821  
    1822      def test_subprocess_exec(self):
    1823          prog = os.path.join(os.path.dirname(__file__), 'echo.py')
    1824  
    1825          connect = self.loop.subprocess_exec(
    1826                          functools.partial(MySubprocessProtocol, self.loop),
    1827                          sys.executable, prog)
    1828  
    1829          transp, proto = self.loop.run_until_complete(connect)
    1830          self.assertIsInstance(proto, MySubprocessProtocol)
    1831          self.loop.run_until_complete(proto.connected)
    1832          self.assertEqual('CONNECTED', proto.state)
    1833  
    1834          stdin = transp.get_pipe_transport(0)
    1835          stdin.write(b'Python The Winner')
    1836          self.loop.run_until_complete(proto.got_data[1].wait())
    1837          with test_utils.disable_logger():
    1838              transp.close()
    1839          self.loop.run_until_complete(proto.completed)
    1840          self.check_killed(proto.returncode)
    1841          self.assertEqual(b'Python The Winner', proto.data[1])
    1842  
    1843      def test_subprocess_interactive(self):
    1844          prog = os.path.join(os.path.dirname(__file__), 'echo.py')
    1845  
    1846          connect = self.loop.subprocess_exec(
    1847                          functools.partial(MySubprocessProtocol, self.loop),
    1848                          sys.executable, prog)
    1849  
    1850          transp, proto = self.loop.run_until_complete(connect)
    1851          self.assertIsInstance(proto, MySubprocessProtocol)
    1852          self.loop.run_until_complete(proto.connected)
    1853          self.assertEqual('CONNECTED', proto.state)
    1854  
    1855          stdin = transp.get_pipe_transport(0)
    1856          stdin.write(b'Python ')
    1857          self.loop.run_until_complete(proto.got_data[1].wait())
    1858          proto.got_data[1].clear()
    1859          self.assertEqual(b'Python ', proto.data[1])
    1860  
    1861          stdin.write(b'The Winner')
    1862          self.loop.run_until_complete(proto.got_data[1].wait())
    1863          self.assertEqual(b'Python The Winner', proto.data[1])
    1864  
    1865          with test_utils.disable_logger():
    1866              transp.close()
    1867          self.loop.run_until_complete(proto.completed)
    1868          self.check_killed(proto.returncode)
    1869  
    1870      def test_subprocess_shell(self):
    1871          connect = self.loop.subprocess_shell(
    1872                          functools.partial(MySubprocessProtocol, self.loop),
    1873                          'echo Python')
    1874          transp, proto = self.loop.run_until_complete(connect)
    1875          self.assertIsInstance(proto, MySubprocessProtocol)
    1876          self.loop.run_until_complete(proto.connected)
    1877  
    1878          transp.get_pipe_transport(0).close()
    1879          self.loop.run_until_complete(proto.completed)
    1880          self.assertEqual(0, proto.returncode)
    1881          self.assertTrue(all(f.done() for f in proto.disconnects.values()))
    1882          self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
    1883          self.assertEqual(proto.data[2], b'')
    1884          transp.close()
    1885  
    1886      def test_subprocess_exitcode(self):
    1887          connect = self.loop.subprocess_shell(
    1888                          functools.partial(MySubprocessProtocol, self.loop),
    1889                          'exit 7', stdin=None, stdout=None, stderr=None)
    1890  
    1891          transp, proto = self.loop.run_until_complete(connect)
    1892          self.assertIsInstance(proto, MySubprocessProtocol)
    1893          self.loop.run_until_complete(proto.completed)
    1894          self.assertEqual(7, proto.returncode)
    1895          transp.close()
    1896  
    1897      def test_subprocess_close_after_finish(self):
    1898          connect = self.loop.subprocess_shell(
    1899                          functools.partial(MySubprocessProtocol, self.loop),
    1900                          'exit 7', stdin=None, stdout=None, stderr=None)
    1901  
    1902          transp, proto = self.loop.run_until_complete(connect)
    1903          self.assertIsInstance(proto, MySubprocessProtocol)
    1904          self.assertIsNone(transp.get_pipe_transport(0))
    1905          self.assertIsNone(transp.get_pipe_transport(1))
    1906          self.assertIsNone(transp.get_pipe_transport(2))
    1907          self.loop.run_until_complete(proto.completed)
    1908          self.assertEqual(7, proto.returncode)
    1909          self.assertIsNone(transp.close())
    1910  
    1911      def test_subprocess_kill(self):
    1912          prog = os.path.join(os.path.dirname(__file__), 'echo.py')
    1913  
    1914          connect = self.loop.subprocess_exec(
    1915                          functools.partial(MySubprocessProtocol, self.loop),
    1916                          sys.executable, prog)
    1917  
    1918          transp, proto = self.loop.run_until_complete(connect)
    1919          self.assertIsInstance(proto, MySubprocessProtocol)
    1920          self.loop.run_until_complete(proto.connected)
    1921  
    1922          transp.kill()
    1923          self.loop.run_until_complete(proto.completed)
    1924          self.check_killed(proto.returncode)
    1925          transp.close()
    1926  
    1927      def test_subprocess_terminate(self):
    1928          prog = os.path.join(os.path.dirname(__file__), 'echo.py')
    1929  
    1930          connect = self.loop.subprocess_exec(
    1931                          functools.partial(MySubprocessProtocol, self.loop),
    1932                          sys.executable, prog)
    1933  
    1934          transp, proto = self.loop.run_until_complete(connect)
    1935          self.assertIsInstance(proto, MySubprocessProtocol)
    1936          self.loop.run_until_complete(proto.connected)
    1937  
    1938          transp.terminate()
    1939          self.loop.run_until_complete(proto.completed)
    1940          self.check_terminated(proto.returncode)
    1941          transp.close()
    1942  
    1943      @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
    1944      def test_subprocess_send_signal(self):
    1945          # bpo-31034: Make sure that we get the default signal handler (killing
    1946          # the process). The parent process may have decided to ignore SIGHUP,
    1947          # and signal handlers are inherited.
    1948          old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
    1949          try:
    1950              prog = os.path.join(os.path.dirname(__file__), 'echo.py')
    1951  
    1952              connect = self.loop.subprocess_exec(
    1953                              functools.partial(MySubprocessProtocol, self.loop),
    1954                              sys.executable, prog)
    1955  
    1956  
    1957              transp, proto = self.loop.run_until_complete(connect)
    1958              self.assertIsInstance(proto, MySubprocessProtocol)
    1959              self.loop.run_until_complete(proto.connected)
    1960  
    1961              transp.send_signal(signal.SIGHUP)
    1962              self.loop.run_until_complete(proto.completed)
    1963              self.assertEqual(-signal.SIGHUP, proto.returncode)
    1964              transp.close()
    1965          finally:
    1966              signal.signal(signal.SIGHUP, old_handler)
    1967  
    1968      def test_subprocess_stderr(self):
    1969          prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
    1970  
    1971          connect = self.loop.subprocess_exec(
    1972                          functools.partial(MySubprocessProtocol, self.loop),
    1973                          sys.executable, prog)
    1974  
    1975          transp, proto = self.loop.run_until_complete(connect)
    1976          self.assertIsInstance(proto, MySubprocessProtocol)
    1977          self.loop.run_until_complete(proto.connected)
    1978  
    1979          stdin = transp.get_pipe_transport(0)
    1980          stdin.write(b'test')
    1981  
    1982          self.loop.run_until_complete(proto.completed)
    1983  
    1984          transp.close()
    1985          self.assertEqual(b'OUT:test', proto.data[1])
    1986          self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
    1987          self.assertEqual(0, proto.returncode)
    1988  
    1989      def test_subprocess_stderr_redirect_to_stdout(self):
    1990          prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
    1991  
    1992          connect = self.loop.subprocess_exec(
    1993                          functools.partial(MySubprocessProtocol, self.loop),
    1994                          sys.executable, prog, stderr=subprocess.STDOUT)
    1995  
    1996  
    1997          transp, proto = self.loop.run_until_complete(connect)
    1998          self.assertIsInstance(proto, MySubprocessProtocol)
    1999          self.loop.run_until_complete(proto.connected)
    2000  
    2001          stdin = transp.get_pipe_transport(0)
    2002          self.assertIsNotNone(transp.get_pipe_transport(1))
    2003          self.assertIsNone(transp.get_pipe_transport(2))
    2004  
    2005          stdin.write(b'test')
    2006          self.loop.run_until_complete(proto.completed)
    2007          self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
    2008                          proto.data[1])
    2009          self.assertEqual(b'', proto.data[2])
    2010  
    2011          transp.close()
    2012          self.assertEqual(0, proto.returncode)
    2013  
    2014      def test_subprocess_close_client_stream(self):
    2015          prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
    2016  
    2017          connect = self.loop.subprocess_exec(
    2018                          functools.partial(MySubprocessProtocol, self.loop),
    2019                          sys.executable, prog)
    2020  
    2021          transp, proto = self.loop.run_until_complete(connect)
    2022          self.assertIsInstance(proto, MySubprocessProtocol)
    2023          self.loop.run_until_complete(proto.connected)
    2024  
    2025          stdin = transp.get_pipe_transport(0)
    2026          stdout = transp.get_pipe_transport(1)
    2027          stdin.write(b'test')
    2028          self.loop.run_until_complete(proto.got_data[1].wait())
    2029          self.assertEqual(b'OUT:test', proto.data[1])
    2030  
    2031          stdout.close()
    2032          self.loop.run_until_complete(proto.disconnects[1])
    2033          stdin.write(b'xxx')
    2034          self.loop.run_until_complete(proto.got_data[2].wait())
    2035          if sys.platform != 'win32':
    2036              self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
    2037          else:
    2038              # After closing the read-end of a pipe, writing to the
    2039              # write-end using os.write() fails with errno==EINVAL and
    2040              # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
    2041              # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
    2042              self.assertEqual(b'ERR:OSError', proto.data[2])
    2043          with test_utils.disable_logger():
    2044              transp.close()
    2045          self.loop.run_until_complete(proto.completed)
    2046          self.check_killed(proto.returncode)
    2047  
    2048      def test_subprocess_wait_no_same_group(self):
    2049          # start the new process in a new session
    2050          connect = self.loop.subprocess_shell(
    2051                          functools.partial(MySubprocessProtocol, self.loop),
    2052                          'exit 7', stdin=None, stdout=None, stderr=None,
    2053                          start_new_session=True)
    2054          transp, proto = self.loop.run_until_complete(connect)
    2055          self.assertIsInstance(proto, MySubprocessProtocol)
    2056          self.loop.run_until_complete(proto.completed)
    2057          self.assertEqual(7, proto.returncode)
    2058          transp.close()
    2059  
    2060      def test_subprocess_exec_invalid_args(self):
    2061          async def connect(**kwds):
    2062              await self.loop.subprocess_exec(
    2063                  asyncio.SubprocessProtocol,
    2064                  'pwd', **kwds)
    2065  
    2066          with self.assertRaises(ValueError):
    2067              self.loop.run_until_complete(connect(universal_newlines=True))
    2068          with self.assertRaises(ValueError):
    2069              self.loop.run_until_complete(connect(bufsize=4096))
    2070          with self.assertRaises(ValueError):
    2071              self.loop.run_until_complete(connect(shell=True))
    2072  
    2073      def test_subprocess_shell_invalid_args(self):
    2074  
    2075          async def connect(cmd=None, **kwds):
    2076              if not cmd:
    2077                  cmd = 'pwd'
    2078              await self.loop.subprocess_shell(
    2079                  asyncio.SubprocessProtocol,
    2080                  cmd, **kwds)
    2081  
    2082          with self.assertRaises(ValueError):
    2083              self.loop.run_until_complete(connect(['ls', '-l']))
    2084          with self.assertRaises(ValueError):
    2085              self.loop.run_until_complete(connect(universal_newlines=True))
    2086          with self.assertRaises(ValueError):
    2087              self.loop.run_until_complete(connect(bufsize=4096))
    2088          with self.assertRaises(ValueError):
    2089              self.loop.run_until_complete(connect(shell=False))
    2090  
    2091  
    2092  if sys.platform == 'win32':
    2093  
    2094      class ESC[4;38;5;81mSelectEventLoopTests(ESC[4;38;5;149mEventLoopTestsMixin,
    2095                                 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2096  
    2097          def create_event_loop(self):
    2098              return asyncio.SelectorEventLoop()
    2099  
    2100      class ESC[4;38;5;81mProactorEventLoopTests(ESC[4;38;5;149mEventLoopTestsMixin,
    2101                                   ESC[4;38;5;149mSubprocessTestsMixin,
    2102                                   ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2103  
    2104          def create_event_loop(self):
    2105              return asyncio.ProactorEventLoop()
    2106  
    2107          def test_reader_callback(self):
    2108              raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
    2109  
    2110          def test_reader_callback_cancel(self):
    2111              raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
    2112  
    2113          def test_writer_callback(self):
    2114              raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
    2115  
    2116          def test_writer_callback_cancel(self):
    2117              raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
    2118  
    2119          def test_remove_fds_after_closing(self):
    2120              raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
    2121  else:
    2122      import selectors
    2123  
    2124      class ESC[4;38;5;81mUnixEventLoopTestsMixin(ESC[4;38;5;149mEventLoopTestsMixin):
    2125          def setUp(self):
    2126              super().setUp()
    2127              with warnings.catch_warnings():
    2128                  warnings.simplefilter('ignore', DeprecationWarning)
    2129                  watcher = asyncio.SafeChildWatcher()
    2130                  watcher.attach_loop(self.loop)
    2131                  asyncio.set_child_watcher(watcher)
    2132  
    2133          def tearDown(self):
    2134              with warnings.catch_warnings():
    2135                  warnings.simplefilter('ignore', DeprecationWarning)
    2136                  asyncio.set_child_watcher(None)
    2137              super().tearDown()
    2138  
    2139  
    2140      if hasattr(selectors, 'KqueueSelector'):
    2141          class ESC[4;38;5;81mKqueueEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
    2142                                     ESC[4;38;5;149mSubprocessTestsMixin,
    2143                                     ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2144  
    2145              def create_event_loop(self):
    2146                  return asyncio.SelectorEventLoop(
    2147                      selectors.KqueueSelector())
    2148  
    2149              # kqueue doesn't support character devices (PTY) on Mac OS X older
    2150              # than 10.9 (Maverick)
    2151              @support.requires_mac_ver(10, 9)
    2152              # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
    2153              # hangs on OpenBSD 5.5
    2154              @unittest.skipIf(sys.platform.startswith('openbsd'),
    2155                               'test hangs on OpenBSD')
    2156              def test_read_pty_output(self):
    2157                  super().test_read_pty_output()
    2158  
    2159              # kqueue doesn't support character devices (PTY) on Mac OS X older
    2160              # than 10.9 (Maverick)
    2161              @support.requires_mac_ver(10, 9)
    2162              def test_write_pty(self):
    2163                  super().test_write_pty()
    2164  
    2165      if hasattr(selectors, 'EpollSelector'):
    2166          class ESC[4;38;5;81mEPollEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
    2167                                    ESC[4;38;5;149mSubprocessTestsMixin,
    2168                                    ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2169  
    2170              def create_event_loop(self):
    2171                  return asyncio.SelectorEventLoop(selectors.EpollSelector())
    2172  
    2173      if hasattr(selectors, 'PollSelector'):
    2174          class ESC[4;38;5;81mPollEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
    2175                                   ESC[4;38;5;149mSubprocessTestsMixin,
    2176                                   ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2177  
    2178              def create_event_loop(self):
    2179                  return asyncio.SelectorEventLoop(selectors.PollSelector())
    2180  
    2181      # Should always exist.
    2182      class ESC[4;38;5;81mSelectEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
    2183                                 ESC[4;38;5;149mSubprocessTestsMixin,
    2184                                 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2185  
    2186          def create_event_loop(self):
    2187              return asyncio.SelectorEventLoop(selectors.SelectSelector())
    2188  
    2189  
    2190  def noop(*args, **kwargs):
    2191      pass
    2192  
    2193  
    2194  class ESC[4;38;5;81mHandleTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2195  
    2196      def setUp(self):
    2197          super().setUp()
    2198          self.loop = mock.Mock()
    2199          self.loop.get_debug.return_value = True
    2200  
    2201      def test_handle(self):
    2202          def callback(*args):
    2203              return args
    2204  
    2205          args = ()
    2206          h = asyncio.Handle(callback, args, self.loop)
    2207          self.assertIs(h._callback, callback)
    2208          self.assertIs(h._args, args)
    2209          self.assertFalse(h.cancelled())
    2210  
    2211          h.cancel()
    2212          self.assertTrue(h.cancelled())
    2213  
    2214      def test_callback_with_exception(self):
    2215          def callback():
    2216              raise ValueError()
    2217  
    2218          self.loop = mock.Mock()
    2219          self.loop.call_exception_handler = mock.Mock()
    2220  
    2221          h = asyncio.Handle(callback, (), self.loop)
    2222          h._run()
    2223  
    2224          self.loop.call_exception_handler.assert_called_with({
    2225              'message': test_utils.MockPattern('Exception in callback.*'),
    2226              'exception': mock.ANY,
    2227              'handle': h,
    2228              'source_traceback': h._source_traceback,
    2229          })
    2230  
    2231      def test_handle_weakref(self):
    2232          wd = weakref.WeakValueDictionary()
    2233          h = asyncio.Handle(lambda: None, (), self.loop)
    2234          wd['h'] = h  # Would fail without __weakref__ slot.
    2235  
    2236      def test_handle_repr(self):
    2237          self.loop.get_debug.return_value = False
    2238  
    2239          # simple function
    2240          h = asyncio.Handle(noop, (1, 2), self.loop)
    2241          filename, lineno = test_utils.get_function_source(noop)
    2242          self.assertEqual(repr(h),
    2243                          '<Handle noop(1, 2) at %s:%s>'
    2244                          % (filename, lineno))
    2245  
    2246          # cancelled handle
    2247          h.cancel()
    2248          self.assertEqual(repr(h),
    2249                          '<Handle cancelled>')
    2250  
    2251          # decorated function
    2252          cb = types.coroutine(noop)
    2253          h = asyncio.Handle(cb, (), self.loop)
    2254          self.assertEqual(repr(h),
    2255                          '<Handle noop() at %s:%s>'
    2256                          % (filename, lineno))
    2257  
    2258          # partial function
    2259          cb = functools.partial(noop, 1, 2)
    2260          h = asyncio.Handle(cb, (3,), self.loop)
    2261          regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
    2262                   % (re.escape(filename), lineno))
    2263          self.assertRegex(repr(h), regex)
    2264  
    2265          # partial function with keyword args
    2266          cb = functools.partial(noop, x=1)
    2267          h = asyncio.Handle(cb, (2, 3), self.loop)
    2268          regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
    2269                   % (re.escape(filename), lineno))
    2270          self.assertRegex(repr(h), regex)
    2271  
    2272          # partial method
    2273          method = HandleTests.test_handle_repr
    2274          cb = functools.partialmethod(method)
    2275          filename, lineno = test_utils.get_function_source(method)
    2276          h = asyncio.Handle(cb, (), self.loop)
    2277  
    2278          cb_regex = r'<function HandleTests.test_handle_repr .*>'
    2279          cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)'
    2280          regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$'
    2281          self.assertRegex(repr(h), regex)
    2282  
    2283      def test_handle_repr_debug(self):
    2284          self.loop.get_debug.return_value = True
    2285  
    2286          # simple function
    2287          create_filename = __file__
    2288          create_lineno = sys._getframe().f_lineno + 1
    2289          h = asyncio.Handle(noop, (1, 2), self.loop)
    2290          filename, lineno = test_utils.get_function_source(noop)
    2291          self.assertEqual(repr(h),
    2292                          '<Handle noop(1, 2) at %s:%s created at %s:%s>'
    2293                          % (filename, lineno, create_filename, create_lineno))
    2294  
    2295          # cancelled handle
    2296          h.cancel()
    2297          self.assertEqual(
    2298              repr(h),
    2299              '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
    2300              % (filename, lineno, create_filename, create_lineno))
    2301  
    2302          # double cancellation won't overwrite _repr
    2303          h.cancel()
    2304          self.assertEqual(
    2305              repr(h),
    2306              '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
    2307              % (filename, lineno, create_filename, create_lineno))
    2308  
    2309      def test_handle_source_traceback(self):
    2310          loop = asyncio.get_event_loop_policy().new_event_loop()
    2311          loop.set_debug(True)
    2312          self.set_event_loop(loop)
    2313  
    2314          def check_source_traceback(h):
    2315              lineno = sys._getframe(1).f_lineno - 1
    2316              self.assertIsInstance(h._source_traceback, list)
    2317              self.assertEqual(h._source_traceback[-1][:3],
    2318                               (__file__,
    2319                                lineno,
    2320                                'test_handle_source_traceback'))
    2321  
    2322          # call_soon
    2323          h = loop.call_soon(noop)
    2324          check_source_traceback(h)
    2325  
    2326          # call_soon_threadsafe
    2327          h = loop.call_soon_threadsafe(noop)
    2328          check_source_traceback(h)
    2329  
    2330          # call_later
    2331          h = loop.call_later(0, noop)
    2332          check_source_traceback(h)
    2333  
    2334          # call_at
    2335          h = loop.call_later(0, noop)
    2336          check_source_traceback(h)
    2337  
    2338      @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
    2339                           'No collections.abc.Coroutine')
    2340      def test_coroutine_like_object_debug_formatting(self):
    2341          # Test that asyncio can format coroutines that are instances of
    2342          # collections.abc.Coroutine, but lack cr_core or gi_code attributes
    2343          # (such as ones compiled with Cython).
    2344  
    2345          coro = CoroLike()
    2346          coro.__name__ = 'AAA'
    2347          self.assertTrue(asyncio.iscoroutine(coro))
    2348          self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
    2349  
    2350          coro.__qualname__ = 'BBB'
    2351          self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
    2352  
    2353          coro.cr_running = True
    2354          self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
    2355  
    2356          coro.__name__ = coro.__qualname__ = None
    2357          self.assertEqual(coroutines._format_coroutine(coro),
    2358                           '<CoroLike without __name__>() running')
    2359  
    2360          coro = CoroLike()
    2361          coro.__qualname__ = 'CoroLike'
    2362          # Some coroutines might not have '__name__', such as
    2363          # built-in async_gen.asend().
    2364          self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
    2365  
    2366          coro = CoroLike()
    2367          coro.__qualname__ = 'AAA'
    2368          coro.cr_code = None
    2369          self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
    2370  
    2371  
    2372  class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2373  
    2374      def setUp(self):
    2375          super().setUp()
    2376          self.loop = mock.Mock()
    2377  
    2378      def test_hash(self):
    2379          when = time.monotonic()
    2380          h = asyncio.TimerHandle(when, lambda: False, (),
    2381                                  mock.Mock())
    2382          self.assertEqual(hash(h), hash(when))
    2383  
    2384      def test_when(self):
    2385          when = time.monotonic()
    2386          h = asyncio.TimerHandle(when, lambda: False, (),
    2387                                  mock.Mock())
    2388          self.assertEqual(when, h.when())
    2389  
    2390      def test_timer(self):
    2391          def callback(*args):
    2392              return args
    2393  
    2394          args = (1, 2, 3)
    2395          when = time.monotonic()
    2396          h = asyncio.TimerHandle(when, callback, args, mock.Mock())
    2397          self.assertIs(h._callback, callback)
    2398          self.assertIs(h._args, args)
    2399          self.assertFalse(h.cancelled())
    2400  
    2401          # cancel
    2402          h.cancel()
    2403          self.assertTrue(h.cancelled())
    2404          self.assertIsNone(h._callback)
    2405          self.assertIsNone(h._args)
    2406  
    2407  
    2408      def test_timer_repr(self):
    2409          self.loop.get_debug.return_value = False
    2410  
    2411          # simple function
    2412          h = asyncio.TimerHandle(123, noop, (), self.loop)
    2413          src = test_utils.get_function_source(noop)
    2414          self.assertEqual(repr(h),
    2415                          '<TimerHandle when=123 noop() at %s:%s>' % src)
    2416  
    2417          # cancelled handle
    2418          h.cancel()
    2419          self.assertEqual(repr(h),
    2420                          '<TimerHandle cancelled when=123>')
    2421  
    2422      def test_timer_repr_debug(self):
    2423          self.loop.get_debug.return_value = True
    2424  
    2425          # simple function
    2426          create_filename = __file__
    2427          create_lineno = sys._getframe().f_lineno + 1
    2428          h = asyncio.TimerHandle(123, noop, (), self.loop)
    2429          filename, lineno = test_utils.get_function_source(noop)
    2430          self.assertEqual(repr(h),
    2431                          '<TimerHandle when=123 noop() '
    2432                          'at %s:%s created at %s:%s>'
    2433                          % (filename, lineno, create_filename, create_lineno))
    2434  
    2435          # cancelled handle
    2436          h.cancel()
    2437          self.assertEqual(repr(h),
    2438                          '<TimerHandle cancelled when=123 noop() '
    2439                          'at %s:%s created at %s:%s>'
    2440                          % (filename, lineno, create_filename, create_lineno))
    2441  
    2442  
    2443      def test_timer_comparison(self):
    2444          def callback(*args):
    2445              return args
    2446  
    2447          when = time.monotonic()
    2448  
    2449          h1 = asyncio.TimerHandle(when, callback, (), self.loop)
    2450          h2 = asyncio.TimerHandle(when, callback, (), self.loop)
    2451          # TODO: Use assertLess etc.
    2452          self.assertFalse(h1 < h2)
    2453          self.assertFalse(h2 < h1)
    2454          self.assertTrue(h1 <= h2)
    2455          self.assertTrue(h2 <= h1)
    2456          self.assertFalse(h1 > h2)
    2457          self.assertFalse(h2 > h1)
    2458          self.assertTrue(h1 >= h2)
    2459          self.assertTrue(h2 >= h1)
    2460          self.assertTrue(h1 == h2)
    2461          self.assertFalse(h1 != h2)
    2462  
    2463          h2.cancel()
    2464          self.assertFalse(h1 == h2)
    2465  
    2466          h1 = asyncio.TimerHandle(when, callback, (), self.loop)
    2467          h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
    2468          self.assertTrue(h1 < h2)
    2469          self.assertFalse(h2 < h1)
    2470          self.assertTrue(h1 <= h2)
    2471          self.assertFalse(h2 <= h1)
    2472          self.assertFalse(h1 > h2)
    2473          self.assertTrue(h2 > h1)
    2474          self.assertFalse(h1 >= h2)
    2475          self.assertTrue(h2 >= h1)
    2476          self.assertFalse(h1 == h2)
    2477          self.assertTrue(h1 != h2)
    2478  
    2479          h3 = asyncio.Handle(callback, (), self.loop)
    2480          self.assertIs(NotImplemented, h1.__eq__(h3))
    2481          self.assertIs(NotImplemented, h1.__ne__(h3))
    2482  
    2483          with self.assertRaises(TypeError):
    2484              h1 < ()
    2485          with self.assertRaises(TypeError):
    2486              h1 > ()
    2487          with self.assertRaises(TypeError):
    2488              h1 <= ()
    2489          with self.assertRaises(TypeError):
    2490              h1 >= ()
    2491          self.assertFalse(h1 == ())
    2492          self.assertTrue(h1 != ())
    2493  
    2494          self.assertTrue(h1 == ALWAYS_EQ)
    2495          self.assertFalse(h1 != ALWAYS_EQ)
    2496          self.assertTrue(h1 < LARGEST)
    2497          self.assertFalse(h1 > LARGEST)
    2498          self.assertTrue(h1 <= LARGEST)
    2499          self.assertFalse(h1 >= LARGEST)
    2500          self.assertFalse(h1 < SMALLEST)
    2501          self.assertTrue(h1 > SMALLEST)
    2502          self.assertFalse(h1 <= SMALLEST)
    2503          self.assertTrue(h1 >= SMALLEST)
    2504  
    2505  
    2506  class ESC[4;38;5;81mAbstractEventLoopTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2507  
    2508      def test_not_implemented(self):
    2509          f = mock.Mock()
    2510          loop = asyncio.AbstractEventLoop()
    2511          self.assertRaises(
    2512              NotImplementedError, loop.run_forever)
    2513          self.assertRaises(
    2514              NotImplementedError, loop.run_until_complete, None)
    2515          self.assertRaises(
    2516              NotImplementedError, loop.stop)
    2517          self.assertRaises(
    2518              NotImplementedError, loop.is_running)
    2519          self.assertRaises(
    2520              NotImplementedError, loop.is_closed)
    2521          self.assertRaises(
    2522              NotImplementedError, loop.close)
    2523          self.assertRaises(
    2524              NotImplementedError, loop.create_task, None)
    2525          self.assertRaises(
    2526              NotImplementedError, loop.call_later, None, None)
    2527          self.assertRaises(
    2528              NotImplementedError, loop.call_at, f, f)
    2529          self.assertRaises(
    2530              NotImplementedError, loop.call_soon, None)
    2531          self.assertRaises(
    2532              NotImplementedError, loop.time)
    2533          self.assertRaises(
    2534              NotImplementedError, loop.call_soon_threadsafe, None)
    2535          self.assertRaises(
    2536              NotImplementedError, loop.set_default_executor, f)
    2537          self.assertRaises(
    2538              NotImplementedError, loop.add_reader, 1, f)
    2539          self.assertRaises(
    2540              NotImplementedError, loop.remove_reader, 1)
    2541          self.assertRaises(
    2542              NotImplementedError, loop.add_writer, 1, f)
    2543          self.assertRaises(
    2544              NotImplementedError, loop.remove_writer, 1)
    2545          self.assertRaises(
    2546              NotImplementedError, loop.add_signal_handler, 1, f)
    2547          self.assertRaises(
    2548              NotImplementedError, loop.remove_signal_handler, 1)
    2549          self.assertRaises(
    2550              NotImplementedError, loop.remove_signal_handler, 1)
    2551          self.assertRaises(
    2552              NotImplementedError, loop.set_exception_handler, f)
    2553          self.assertRaises(
    2554              NotImplementedError, loop.default_exception_handler, f)
    2555          self.assertRaises(
    2556              NotImplementedError, loop.call_exception_handler, f)
    2557          self.assertRaises(
    2558              NotImplementedError, loop.get_debug)
    2559          self.assertRaises(
    2560              NotImplementedError, loop.set_debug, f)
    2561  
    2562      def test_not_implemented_async(self):
    2563  
    2564          async def inner():
    2565              f = mock.Mock()
    2566              loop = asyncio.AbstractEventLoop()
    2567  
    2568              with self.assertRaises(NotImplementedError):
    2569                  await loop.run_in_executor(f, f)
    2570              with self.assertRaises(NotImplementedError):
    2571                  await loop.getaddrinfo('localhost', 8080)
    2572              with self.assertRaises(NotImplementedError):
    2573                  await loop.getnameinfo(('localhost', 8080))
    2574              with self.assertRaises(NotImplementedError):
    2575                  await loop.create_connection(f)
    2576              with self.assertRaises(NotImplementedError):
    2577                  await loop.create_server(f)
    2578              with self.assertRaises(NotImplementedError):
    2579                  await loop.create_datagram_endpoint(f)
    2580              with self.assertRaises(NotImplementedError):
    2581                  await loop.sock_recv(f, 10)
    2582              with self.assertRaises(NotImplementedError):
    2583                  await loop.sock_recv_into(f, 10)
    2584              with self.assertRaises(NotImplementedError):
    2585                  await loop.sock_sendall(f, 10)
    2586              with self.assertRaises(NotImplementedError):
    2587                  await loop.sock_connect(f, f)
    2588              with self.assertRaises(NotImplementedError):
    2589                  await loop.sock_accept(f)
    2590              with self.assertRaises(NotImplementedError):
    2591                  await loop.sock_sendfile(f, f)
    2592              with self.assertRaises(NotImplementedError):
    2593                  await loop.sendfile(f, f)
    2594              with self.assertRaises(NotImplementedError):
    2595                  await loop.connect_read_pipe(f, mock.sentinel.pipe)
    2596              with self.assertRaises(NotImplementedError):
    2597                  await loop.connect_write_pipe(f, mock.sentinel.pipe)
    2598              with self.assertRaises(NotImplementedError):
    2599                  await loop.subprocess_shell(f, mock.sentinel)
    2600              with self.assertRaises(NotImplementedError):
    2601                  await loop.subprocess_exec(f)
    2602  
    2603          loop = asyncio.new_event_loop()
    2604          loop.run_until_complete(inner())
    2605          loop.close()
    2606  
    2607  
    2608  class ESC[4;38;5;81mPolicyTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2609  
    2610      def test_event_loop_policy(self):
    2611          policy = asyncio.AbstractEventLoopPolicy()
    2612          self.assertRaises(NotImplementedError, policy.get_event_loop)
    2613          self.assertRaises(NotImplementedError, policy.set_event_loop, object())
    2614          self.assertRaises(NotImplementedError, policy.new_event_loop)
    2615          self.assertRaises(NotImplementedError, policy.get_child_watcher)
    2616          self.assertRaises(NotImplementedError, policy.set_child_watcher,
    2617                            object())
    2618  
    2619      def test_get_event_loop(self):
    2620          policy = asyncio.DefaultEventLoopPolicy()
    2621          self.assertIsNone(policy._local._loop)
    2622          with self.assertWarns(DeprecationWarning) as cm:
    2623              loop = policy.get_event_loop()
    2624          self.assertEqual(cm.filename, __file__)
    2625          self.assertIsInstance(loop, asyncio.AbstractEventLoop)
    2626  
    2627          self.assertIs(policy._local._loop, loop)
    2628          self.assertIs(loop, policy.get_event_loop())
    2629          loop.close()
    2630  
    2631      def test_get_event_loop_calls_set_event_loop(self):
    2632          policy = asyncio.DefaultEventLoopPolicy()
    2633  
    2634          with mock.patch.object(
    2635                  policy, "set_event_loop",
    2636                  wraps=policy.set_event_loop) as m_set_event_loop:
    2637  
    2638              with self.assertWarns(DeprecationWarning) as cm:
    2639                  loop = policy.get_event_loop()
    2640              self.addCleanup(loop.close)
    2641              self.assertEqual(cm.filename, __file__)
    2642  
    2643              # policy._local._loop must be set through .set_event_loop()
    2644              # (the unix DefaultEventLoopPolicy needs this call to attach
    2645              # the child watcher correctly)
    2646              m_set_event_loop.assert_called_with(loop)
    2647  
    2648          loop.close()
    2649  
    2650      def test_get_event_loop_after_set_none(self):
    2651          policy = asyncio.DefaultEventLoopPolicy()
    2652          policy.set_event_loop(None)
    2653          self.assertRaises(RuntimeError, policy.get_event_loop)
    2654  
    2655      @mock.patch('asyncio.events.threading.current_thread')
    2656      def test_get_event_loop_thread(self, m_current_thread):
    2657  
    2658          def f():
    2659              policy = asyncio.DefaultEventLoopPolicy()
    2660              self.assertRaises(RuntimeError, policy.get_event_loop)
    2661  
    2662          th = threading.Thread(target=f)
    2663          th.start()
    2664          th.join()
    2665  
    2666      def test_new_event_loop(self):
    2667          policy = asyncio.DefaultEventLoopPolicy()
    2668  
    2669          loop = policy.new_event_loop()
    2670          self.assertIsInstance(loop, asyncio.AbstractEventLoop)
    2671          loop.close()
    2672  
    2673      def test_set_event_loop(self):
    2674          policy = asyncio.DefaultEventLoopPolicy()
    2675          old_loop = policy.new_event_loop()
    2676          policy.set_event_loop(old_loop)
    2677  
    2678          self.assertRaises(TypeError, policy.set_event_loop, object())
    2679  
    2680          loop = policy.new_event_loop()
    2681          policy.set_event_loop(loop)
    2682          self.assertIs(loop, policy.get_event_loop())
    2683          self.assertIsNot(old_loop, policy.get_event_loop())
    2684          loop.close()
    2685          old_loop.close()
    2686  
    2687      def test_get_event_loop_policy(self):
    2688          policy = asyncio.get_event_loop_policy()
    2689          self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
    2690          self.assertIs(policy, asyncio.get_event_loop_policy())
    2691  
    2692      def test_set_event_loop_policy(self):
    2693          self.assertRaises(
    2694              TypeError, asyncio.set_event_loop_policy, object())
    2695  
    2696          old_policy = asyncio.get_event_loop_policy()
    2697  
    2698          policy = asyncio.DefaultEventLoopPolicy()
    2699          asyncio.set_event_loop_policy(policy)
    2700          self.assertIs(policy, asyncio.get_event_loop_policy())
    2701          self.assertIsNot(policy, old_policy)
    2702  
    2703  
    2704  class ESC[4;38;5;81mGetEventLoopTestsMixin:
    2705  
    2706      _get_running_loop_impl = None
    2707      _set_running_loop_impl = None
    2708      get_running_loop_impl = None
    2709      get_event_loop_impl = None
    2710  
    2711      def setUp(self):
    2712          self._get_running_loop_saved = events._get_running_loop
    2713          self._set_running_loop_saved = events._set_running_loop
    2714          self.get_running_loop_saved = events.get_running_loop
    2715          self.get_event_loop_saved = events.get_event_loop
    2716  
    2717          events._get_running_loop = type(self)._get_running_loop_impl
    2718          events._set_running_loop = type(self)._set_running_loop_impl
    2719          events.get_running_loop = type(self).get_running_loop_impl
    2720          events.get_event_loop = type(self).get_event_loop_impl
    2721  
    2722          asyncio._get_running_loop = type(self)._get_running_loop_impl
    2723          asyncio._set_running_loop = type(self)._set_running_loop_impl
    2724          asyncio.get_running_loop = type(self).get_running_loop_impl
    2725          asyncio.get_event_loop = type(self).get_event_loop_impl
    2726  
    2727          super().setUp()
    2728  
    2729          self.loop = asyncio.new_event_loop()
    2730          asyncio.set_event_loop(self.loop)
    2731  
    2732          if sys.platform != 'win32':
    2733              with warnings.catch_warnings():
    2734                  warnings.simplefilter('ignore', DeprecationWarning)
    2735                  watcher = asyncio.SafeChildWatcher()
    2736                  watcher.attach_loop(self.loop)
    2737                  asyncio.set_child_watcher(watcher)
    2738  
    2739      def tearDown(self):
    2740          try:
    2741              if sys.platform != 'win32':
    2742                  with warnings.catch_warnings():
    2743                      warnings.simplefilter('ignore', DeprecationWarning)
    2744                      asyncio.set_child_watcher(None)
    2745  
    2746              super().tearDown()
    2747          finally:
    2748              self.loop.close()
    2749              asyncio.set_event_loop(None)
    2750  
    2751              events._get_running_loop = self._get_running_loop_saved
    2752              events._set_running_loop = self._set_running_loop_saved
    2753              events.get_running_loop = self.get_running_loop_saved
    2754              events.get_event_loop = self.get_event_loop_saved
    2755  
    2756              asyncio._get_running_loop = self._get_running_loop_saved
    2757              asyncio._set_running_loop = self._set_running_loop_saved
    2758              asyncio.get_running_loop = self.get_running_loop_saved
    2759              asyncio.get_event_loop = self.get_event_loop_saved
    2760  
    2761      if sys.platform != 'win32':
    2762  
    2763          def test_get_event_loop_new_process(self):
    2764              # bpo-32126: The multiprocessing module used by
    2765              # ProcessPoolExecutor is not functional when the
    2766              # multiprocessing.synchronize module cannot be imported.
    2767              support.skip_if_broken_multiprocessing_synchronize()
    2768  
    2769              self.addCleanup(multiprocessing_cleanup_tests)
    2770  
    2771              async def main():
    2772                  if multiprocessing.get_start_method() == 'fork':
    2773                      # Avoid 'fork' DeprecationWarning.
    2774                      mp_context = multiprocessing.get_context('forkserver')
    2775                  else:
    2776                      mp_context = None
    2777                  pool = concurrent.futures.ProcessPoolExecutor(
    2778                          mp_context=mp_context)
    2779                  result = await self.loop.run_in_executor(
    2780                      pool, _test_get_event_loop_new_process__sub_proc)
    2781                  pool.shutdown()
    2782                  return result
    2783  
    2784              self.assertEqual(
    2785                  self.loop.run_until_complete(main()),
    2786                  'hello')
    2787  
    2788      def test_get_event_loop_returns_running_loop(self):
    2789          class ESC[4;38;5;81mTestError(ESC[4;38;5;149mException):
    2790              pass
    2791  
    2792          class ESC[4;38;5;81mPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDefaultEventLoopPolicy):
    2793              def get_event_loop(self):
    2794                  raise TestError
    2795  
    2796          old_policy = asyncio.get_event_loop_policy()
    2797          try:
    2798              asyncio.set_event_loop_policy(Policy())
    2799              loop = asyncio.new_event_loop()
    2800  
    2801              with self.assertRaises(TestError):
    2802                  asyncio.get_event_loop()
    2803              asyncio.set_event_loop(None)
    2804              with self.assertRaises(TestError):
    2805                  asyncio.get_event_loop()
    2806  
    2807              with self.assertRaisesRegex(RuntimeError, 'no running'):
    2808                  asyncio.get_running_loop()
    2809              self.assertIs(asyncio._get_running_loop(), None)
    2810  
    2811              async def func():
    2812                  self.assertIs(asyncio.get_event_loop(), loop)
    2813                  self.assertIs(asyncio.get_running_loop(), loop)
    2814                  self.assertIs(asyncio._get_running_loop(), loop)
    2815  
    2816              loop.run_until_complete(func())
    2817  
    2818              asyncio.set_event_loop(loop)
    2819              with self.assertRaises(TestError):
    2820                  asyncio.get_event_loop()
    2821              asyncio.set_event_loop(None)
    2822              with self.assertRaises(TestError):
    2823                  asyncio.get_event_loop()
    2824  
    2825          finally:
    2826              asyncio.set_event_loop_policy(old_policy)
    2827              if loop is not None:
    2828                  loop.close()
    2829  
    2830          with self.assertRaisesRegex(RuntimeError, 'no running'):
    2831              asyncio.get_running_loop()
    2832  
    2833          self.assertIs(asyncio._get_running_loop(), None)
    2834  
    2835      def test_get_event_loop_returns_running_loop2(self):
    2836          old_policy = asyncio.get_event_loop_policy()
    2837          try:
    2838              asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
    2839              loop = asyncio.new_event_loop()
    2840              self.addCleanup(loop.close)
    2841  
    2842              with self.assertWarns(DeprecationWarning) as cm:
    2843                  loop2 = asyncio.get_event_loop()
    2844              self.addCleanup(loop2.close)
    2845              self.assertEqual(cm.filename, __file__)
    2846              asyncio.set_event_loop(None)
    2847              with self.assertRaisesRegex(RuntimeError, 'no current'):
    2848                  asyncio.get_event_loop()
    2849  
    2850              with self.assertRaisesRegex(RuntimeError, 'no running'):
    2851                  asyncio.get_running_loop()
    2852              self.assertIs(asyncio._get_running_loop(), None)
    2853  
    2854              async def func():
    2855                  self.assertIs(asyncio.get_event_loop(), loop)
    2856                  self.assertIs(asyncio.get_running_loop(), loop)
    2857                  self.assertIs(asyncio._get_running_loop(), loop)
    2858  
    2859              loop.run_until_complete(func())
    2860  
    2861              asyncio.set_event_loop(loop)
    2862              self.assertIs(asyncio.get_event_loop(), loop)
    2863  
    2864              asyncio.set_event_loop(None)
    2865              with self.assertRaisesRegex(RuntimeError, 'no current'):
    2866                  asyncio.get_event_loop()
    2867  
    2868          finally:
    2869              asyncio.set_event_loop_policy(old_policy)
    2870              if loop is not None:
    2871                  loop.close()
    2872  
    2873          with self.assertRaisesRegex(RuntimeError, 'no running'):
    2874              asyncio.get_running_loop()
    2875  
    2876          self.assertIs(asyncio._get_running_loop(), None)
    2877  
    2878  
    2879  class ESC[4;38;5;81mTestPyGetEventLoop(ESC[4;38;5;149mGetEventLoopTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2880  
    2881      _get_running_loop_impl = events._py__get_running_loop
    2882      _set_running_loop_impl = events._py__set_running_loop
    2883      get_running_loop_impl = events._py_get_running_loop
    2884      get_event_loop_impl = events._py_get_event_loop
    2885  
    2886  
    2887  try:
    2888      import _asyncio  # NoQA
    2889  except ImportError:
    2890      pass
    2891  else:
    2892  
    2893      class ESC[4;38;5;81mTestCGetEventLoop(ESC[4;38;5;149mGetEventLoopTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2894  
    2895          _get_running_loop_impl = events._c__get_running_loop
    2896          _set_running_loop_impl = events._c__set_running_loop
    2897          get_running_loop_impl = events._c_get_running_loop
    2898          get_event_loop_impl = events._c_get_event_loop
    2899  
    2900  
    2901  class ESC[4;38;5;81mTestServer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2902  
    2903      def test_get_loop(self):
    2904          loop = asyncio.new_event_loop()
    2905          self.addCleanup(loop.close)
    2906          proto = MyProto(loop)
    2907          server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
    2908          self.assertEqual(server.get_loop(), loop)
    2909          server.close()
    2910          loop.run_until_complete(server.wait_closed())
    2911  
    2912  
    2913  class ESC[4;38;5;81mTestAbstractServer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2914  
    2915      def test_close(self):
    2916          with self.assertRaises(NotImplementedError):
    2917              events.AbstractServer().close()
    2918  
    2919      def test_wait_closed(self):
    2920          loop = asyncio.new_event_loop()
    2921          self.addCleanup(loop.close)
    2922  
    2923          with self.assertRaises(NotImplementedError):
    2924              loop.run_until_complete(events.AbstractServer().wait_closed())
    2925  
    2926      def test_get_loop(self):
    2927          with self.assertRaises(NotImplementedError):
    2928              events.AbstractServer().get_loop()
    2929  
    2930  
    2931  if __name__ == '__main__':
    2932      unittest.main()