python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_base_events.py
       1  """Tests for base_events.py"""
       2  
       3  import concurrent.futures
       4  import errno
       5  import math
       6  import socket
       7  import sys
       8  import threading
       9  import time
      10  import unittest
      11  from unittest import mock
      12  
      13  import asyncio
      14  from asyncio import base_events
      15  from asyncio import constants
      16  from test.test_asyncio import utils as test_utils
      17  from test import support
      18  from test.support.script_helper import assert_python_ok
      19  from test.support import os_helper
      20  from test.support import socket_helper
      21  import warnings
      22  
      23  MOCK_ANY = mock.ANY
      24  
      25  
      26  def tearDownModule():
      27      asyncio.set_event_loop_policy(None)
      28  
      29  
      30  def mock_socket_module():
      31      m_socket = mock.MagicMock(spec=socket)
      32      for name in (
      33          'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
      34          'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
      35      ):
      36          if hasattr(socket, name):
      37              setattr(m_socket, name, getattr(socket, name))
      38          else:
      39              delattr(m_socket, name)
      40  
      41      m_socket.socket = mock.MagicMock()
      42      m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
      43  
      44      return m_socket
      45  
      46  
      47  def patch_socket(f):
      48      return mock.patch('asyncio.base_events.socket',
      49                        new_callable=mock_socket_module)(f)
      50  
      51  
      52  class ESC[4;38;5;81mBaseEventTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      53  
      54      def test_ipaddr_info(self):
      55          UNSPEC = socket.AF_UNSPEC
      56          INET = socket.AF_INET
      57          INET6 = socket.AF_INET6
      58          STREAM = socket.SOCK_STREAM
      59          DGRAM = socket.SOCK_DGRAM
      60          TCP = socket.IPPROTO_TCP
      61          UDP = socket.IPPROTO_UDP
      62  
      63          self.assertEqual(
      64              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
      65              base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
      66  
      67          self.assertEqual(
      68              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
      69              base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
      70  
      71          self.assertEqual(
      72              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
      73              base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
      74  
      75          self.assertEqual(
      76              (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
      77              base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
      78  
      79          # Socket type STREAM implies TCP protocol.
      80          self.assertEqual(
      81              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
      82              base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
      83  
      84          # Socket type DGRAM implies UDP protocol.
      85          self.assertEqual(
      86              (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
      87              base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
      88  
      89          # No socket type.
      90          self.assertIsNone(
      91              base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
      92  
      93          if socket_helper.IPV6_ENABLED:
      94              # IPv4 address with family IPv6.
      95              self.assertIsNone(
      96                  base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
      97  
      98              self.assertEqual(
      99                  (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
     100                  base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
     101  
     102              self.assertEqual(
     103                  (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
     104                  base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
     105  
     106              # IPv6 address with family IPv4.
     107              self.assertIsNone(
     108                  base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
     109  
     110              # IPv6 address with zone index.
     111              self.assertIsNone(
     112                  base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
     113  
     114      def test_port_parameter_types(self):
     115          # Test obscure kinds of arguments for "port".
     116          INET = socket.AF_INET
     117          STREAM = socket.SOCK_STREAM
     118          TCP = socket.IPPROTO_TCP
     119  
     120          self.assertEqual(
     121              (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
     122              base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
     123  
     124          self.assertEqual(
     125              (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
     126              base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
     127  
     128          self.assertEqual(
     129              (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
     130              base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
     131  
     132          self.assertEqual(
     133              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
     134              base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
     135  
     136          self.assertEqual(
     137              (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
     138              base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
     139  
     140      @patch_socket
     141      def test_ipaddr_info_no_inet_pton(self, m_socket):
     142          del m_socket.inet_pton
     143          self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
     144                                                     socket.AF_INET,
     145                                                     socket.SOCK_STREAM,
     146                                                     socket.IPPROTO_TCP))
     147  
     148  
     149  class ESC[4;38;5;81mBaseEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     150  
     151      def setUp(self):
     152          super().setUp()
     153          self.loop = base_events.BaseEventLoop()
     154          self.loop._selector = mock.Mock()
     155          self.loop._selector.select.return_value = ()
     156          self.set_event_loop(self.loop)
     157  
     158      def test_not_implemented(self):
     159          m = mock.Mock()
     160          self.assertRaises(
     161              NotImplementedError,
     162              self.loop._make_socket_transport, m, m)
     163          self.assertRaises(
     164              NotImplementedError,
     165              self.loop._make_ssl_transport, m, m, m, m)
     166          self.assertRaises(
     167              NotImplementedError,
     168              self.loop._make_datagram_transport, m, m)
     169          self.assertRaises(
     170              NotImplementedError, self.loop._process_events, [])
     171          self.assertRaises(
     172              NotImplementedError, self.loop._write_to_self)
     173          self.assertRaises(
     174              NotImplementedError,
     175              self.loop._make_read_pipe_transport, m, m)
     176          self.assertRaises(
     177              NotImplementedError,
     178              self.loop._make_write_pipe_transport, m, m)
     179          gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
     180          with self.assertRaises(NotImplementedError):
     181              gen.send(None)
     182  
     183      def test_close(self):
     184          self.assertFalse(self.loop.is_closed())
     185          self.loop.close()
     186          self.assertTrue(self.loop.is_closed())
     187  
     188          # it should be possible to call close() more than once
     189          self.loop.close()
     190          self.loop.close()
     191  
     192          # operation blocked when the loop is closed
     193          f = self.loop.create_future()
     194          self.assertRaises(RuntimeError, self.loop.run_forever)
     195          self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
     196  
     197      def test__add_callback_handle(self):
     198          h = asyncio.Handle(lambda: False, (), self.loop, None)
     199  
     200          self.loop._add_callback(h)
     201          self.assertFalse(self.loop._scheduled)
     202          self.assertIn(h, self.loop._ready)
     203  
     204      def test__add_callback_cancelled_handle(self):
     205          h = asyncio.Handle(lambda: False, (), self.loop, None)
     206          h.cancel()
     207  
     208          self.loop._add_callback(h)
     209          self.assertFalse(self.loop._scheduled)
     210          self.assertFalse(self.loop._ready)
     211  
     212      def test_set_default_executor(self):
     213          class ESC[4;38;5;81mDummyExecutor(ESC[4;38;5;149mconcurrentESC[4;38;5;149m.ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mThreadPoolExecutor):
     214              def submit(self, fn, *args, **kwargs):
     215                  raise NotImplementedError(
     216                      'cannot submit into a dummy executor')
     217  
     218          self.loop._process_events = mock.Mock()
     219          self.loop._write_to_self = mock.Mock()
     220  
     221          executor = DummyExecutor()
     222          self.loop.set_default_executor(executor)
     223          self.assertIs(executor, self.loop._default_executor)
     224  
     225      def test_set_default_executor_error(self):
     226          executor = mock.Mock()
     227  
     228          msg = 'executor must be ThreadPoolExecutor instance'
     229          with self.assertRaisesRegex(TypeError, msg):
     230              self.loop.set_default_executor(executor)
     231  
     232          self.assertIsNone(self.loop._default_executor)
     233  
     234      def test_call_soon(self):
     235          def cb():
     236              pass
     237  
     238          h = self.loop.call_soon(cb)
     239          self.assertEqual(h._callback, cb)
     240          self.assertIsInstance(h, asyncio.Handle)
     241          self.assertIn(h, self.loop._ready)
     242  
     243      def test_call_soon_non_callable(self):
     244          self.loop.set_debug(True)
     245          with self.assertRaisesRegex(TypeError, 'a callable object'):
     246              self.loop.call_soon(1)
     247  
     248      def test_call_later(self):
     249          def cb():
     250              pass
     251  
     252          h = self.loop.call_later(10.0, cb)
     253          self.assertIsInstance(h, asyncio.TimerHandle)
     254          self.assertIn(h, self.loop._scheduled)
     255          self.assertNotIn(h, self.loop._ready)
     256          with self.assertRaises(TypeError, msg="delay must not be None"):
     257              self.loop.call_later(None, cb)
     258  
     259      def test_call_later_negative_delays(self):
     260          calls = []
     261  
     262          def cb(arg):
     263              calls.append(arg)
     264  
     265          self.loop._process_events = mock.Mock()
     266          self.loop.call_later(-1, cb, 'a')
     267          self.loop.call_later(-2, cb, 'b')
     268          test_utils.run_briefly(self.loop)
     269          self.assertEqual(calls, ['b', 'a'])
     270  
     271      def test_time_and_call_at(self):
     272          def cb():
     273              self.loop.stop()
     274  
     275          self.loop._process_events = mock.Mock()
     276          delay = 0.1
     277  
     278          when = self.loop.time() + delay
     279          self.loop.call_at(when, cb)
     280          t0 = self.loop.time()
     281          self.loop.run_forever()
     282          dt = self.loop.time() - t0
     283  
     284          # 50 ms: maximum granularity of the event loop
     285          self.assertGreaterEqual(dt, delay - 0.050, dt)
     286          # tolerate a difference of +800 ms because some Python buildbots
     287          # are really slow
     288          self.assertLessEqual(dt, 0.9, dt)
     289          with self.assertRaises(TypeError, msg="when cannot be None"):
     290              self.loop.call_at(None, cb)
     291  
     292      def check_thread(self, loop, debug):
     293          def cb():
     294              pass
     295  
     296          loop.set_debug(debug)
     297          if debug:
     298              msg = ("Non-thread-safe operation invoked on an event loop other "
     299                     "than the current one")
     300              with self.assertRaisesRegex(RuntimeError, msg):
     301                  loop.call_soon(cb)
     302              with self.assertRaisesRegex(RuntimeError, msg):
     303                  loop.call_later(60, cb)
     304              with self.assertRaisesRegex(RuntimeError, msg):
     305                  loop.call_at(loop.time() + 60, cb)
     306          else:
     307              loop.call_soon(cb)
     308              loop.call_later(60, cb)
     309              loop.call_at(loop.time() + 60, cb)
     310  
     311      def test_check_thread(self):
     312          def check_in_thread(loop, event, debug, create_loop, fut):
     313              # wait until the event loop is running
     314              event.wait()
     315  
     316              try:
     317                  if create_loop:
     318                      loop2 = base_events.BaseEventLoop()
     319                      try:
     320                          asyncio.set_event_loop(loop2)
     321                          self.check_thread(loop, debug)
     322                      finally:
     323                          asyncio.set_event_loop(None)
     324                          loop2.close()
     325                  else:
     326                      self.check_thread(loop, debug)
     327              except Exception as exc:
     328                  loop.call_soon_threadsafe(fut.set_exception, exc)
     329              else:
     330                  loop.call_soon_threadsafe(fut.set_result, None)
     331  
     332          def test_thread(loop, debug, create_loop=False):
     333              event = threading.Event()
     334              fut = loop.create_future()
     335              loop.call_soon(event.set)
     336              args = (loop, event, debug, create_loop, fut)
     337              thread = threading.Thread(target=check_in_thread, args=args)
     338              thread.start()
     339              loop.run_until_complete(fut)
     340              thread.join()
     341  
     342          self.loop._process_events = mock.Mock()
     343          self.loop._write_to_self = mock.Mock()
     344  
     345          # raise RuntimeError if the thread has no event loop
     346          test_thread(self.loop, True)
     347  
     348          # check disabled if debug mode is disabled
     349          test_thread(self.loop, False)
     350  
     351          # raise RuntimeError if the event loop of the thread is not the called
     352          # event loop
     353          test_thread(self.loop, True, create_loop=True)
     354  
     355          # check disabled if debug mode is disabled
     356          test_thread(self.loop, False, create_loop=True)
     357  
     358      def test__run_once(self):
     359          h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
     360                                   self.loop, None)
     361          h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
     362                                   self.loop, None)
     363  
     364          h1.cancel()
     365  
     366          self.loop._process_events = mock.Mock()
     367          self.loop._scheduled.append(h1)
     368          self.loop._scheduled.append(h2)
     369          self.loop._run_once()
     370  
     371          t = self.loop._selector.select.call_args[0][0]
     372          self.assertTrue(9.5 < t < 10.5, t)
     373          self.assertEqual([h2], self.loop._scheduled)
     374          self.assertTrue(self.loop._process_events.called)
     375  
     376      def test_set_debug(self):
     377          self.loop.set_debug(True)
     378          self.assertTrue(self.loop.get_debug())
     379          self.loop.set_debug(False)
     380          self.assertFalse(self.loop.get_debug())
     381  
     382      def test__run_once_schedule_handle(self):
     383          handle = None
     384          processed = False
     385  
     386          def cb(loop):
     387              nonlocal processed, handle
     388              processed = True
     389              handle = loop.call_soon(lambda: True)
     390  
     391          h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
     392                                  self.loop, None)
     393  
     394          self.loop._process_events = mock.Mock()
     395          self.loop._scheduled.append(h)
     396          self.loop._run_once()
     397  
     398          self.assertTrue(processed)
     399          self.assertEqual([handle], list(self.loop._ready))
     400  
     401      def test__run_once_cancelled_event_cleanup(self):
     402          self.loop._process_events = mock.Mock()
     403  
     404          self.assertTrue(
     405              0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
     406  
     407          def cb():
     408              pass
     409  
     410          # Set up one "blocking" event that will not be cancelled to
     411          # ensure later cancelled events do not make it to the head
     412          # of the queue and get cleaned.
     413          not_cancelled_count = 1
     414          self.loop.call_later(3000, cb)
     415  
     416          # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
     417          # cancelled handles, ensure they aren't removed
     418  
     419          cancelled_count = 2
     420          for x in range(2):
     421              h = self.loop.call_later(3600, cb)
     422              h.cancel()
     423  
     424          # Add some cancelled events that will be at head and removed
     425          cancelled_count += 2
     426          for x in range(2):
     427              h = self.loop.call_later(100, cb)
     428              h.cancel()
     429  
     430          # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
     431          self.assertLessEqual(cancelled_count + not_cancelled_count,
     432              base_events._MIN_SCHEDULED_TIMER_HANDLES)
     433  
     434          self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
     435  
     436          self.loop._run_once()
     437  
     438          cancelled_count -= 2
     439  
     440          self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
     441  
     442          self.assertEqual(len(self.loop._scheduled),
     443              cancelled_count + not_cancelled_count)
     444  
     445          # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
     446          # so that deletion of cancelled events will occur on next _run_once
     447          add_cancel_count = int(math.ceil(
     448              base_events._MIN_SCHEDULED_TIMER_HANDLES *
     449              base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
     450  
     451          add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
     452              add_cancel_count, 0)
     453  
     454          # Add some events that will not be cancelled
     455          not_cancelled_count += add_not_cancel_count
     456          for x in range(add_not_cancel_count):
     457              self.loop.call_later(3600, cb)
     458  
     459          # Add enough cancelled events
     460          cancelled_count += add_cancel_count
     461          for x in range(add_cancel_count):
     462              h = self.loop.call_later(3600, cb)
     463              h.cancel()
     464  
     465          # Ensure all handles are still scheduled
     466          self.assertEqual(len(self.loop._scheduled),
     467              cancelled_count + not_cancelled_count)
     468  
     469          self.loop._run_once()
     470  
     471          # Ensure cancelled events were removed
     472          self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
     473  
     474          # Ensure only uncancelled events remain scheduled
     475          self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
     476  
     477      def test_run_until_complete_type_error(self):
     478          self.assertRaises(TypeError,
     479              self.loop.run_until_complete, 'blah')
     480  
     481      def test_run_until_complete_loop(self):
     482          task = self.loop.create_future()
     483          other_loop = self.new_test_loop()
     484          self.addCleanup(other_loop.close)
     485          self.assertRaises(ValueError,
     486              other_loop.run_until_complete, task)
     487  
     488      def test_run_until_complete_loop_orphan_future_close_loop(self):
     489          class ESC[4;38;5;81mShowStopper(ESC[4;38;5;149mSystemExit):
     490              pass
     491  
     492          async def foo(delay):
     493              await asyncio.sleep(delay)
     494  
     495          def throw():
     496              raise ShowStopper
     497  
     498          self.loop._process_events = mock.Mock()
     499          self.loop.call_soon(throw)
     500          with self.assertRaises(ShowStopper):
     501              self.loop.run_until_complete(foo(0.1))
     502  
     503          # This call fails if run_until_complete does not clean up
     504          # done-callback for the previous future.
     505          self.loop.run_until_complete(foo(0.2))
     506  
     507      def test_subprocess_exec_invalid_args(self):
     508          args = [sys.executable, '-c', 'pass']
     509  
     510          # missing program parameter (empty args)
     511          self.assertRaises(TypeError,
     512              self.loop.run_until_complete, self.loop.subprocess_exec,
     513              asyncio.SubprocessProtocol)
     514  
     515          # expected multiple arguments, not a list
     516          self.assertRaises(TypeError,
     517              self.loop.run_until_complete, self.loop.subprocess_exec,
     518              asyncio.SubprocessProtocol, args)
     519  
     520          # program arguments must be strings, not int
     521          self.assertRaises(TypeError,
     522              self.loop.run_until_complete, self.loop.subprocess_exec,
     523              asyncio.SubprocessProtocol, sys.executable, 123)
     524  
     525          # universal_newlines, shell, bufsize must not be set
     526          self.assertRaises(TypeError,
     527          self.loop.run_until_complete, self.loop.subprocess_exec,
     528              asyncio.SubprocessProtocol, *args, universal_newlines=True)
     529          self.assertRaises(TypeError,
     530              self.loop.run_until_complete, self.loop.subprocess_exec,
     531              asyncio.SubprocessProtocol, *args, shell=True)
     532          self.assertRaises(TypeError,
     533              self.loop.run_until_complete, self.loop.subprocess_exec,
     534              asyncio.SubprocessProtocol, *args, bufsize=4096)
     535  
     536      def test_subprocess_shell_invalid_args(self):
     537          # expected a string, not an int or a list
     538          self.assertRaises(TypeError,
     539              self.loop.run_until_complete, self.loop.subprocess_shell,
     540              asyncio.SubprocessProtocol, 123)
     541          self.assertRaises(TypeError,
     542              self.loop.run_until_complete, self.loop.subprocess_shell,
     543              asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
     544  
     545          # universal_newlines, shell, bufsize must not be set
     546          self.assertRaises(TypeError,
     547              self.loop.run_until_complete, self.loop.subprocess_shell,
     548              asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
     549          self.assertRaises(TypeError,
     550              self.loop.run_until_complete, self.loop.subprocess_shell,
     551              asyncio.SubprocessProtocol, 'exit 0', shell=True)
     552          self.assertRaises(TypeError,
     553              self.loop.run_until_complete, self.loop.subprocess_shell,
     554              asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
     555  
     556      def test_default_exc_handler_callback(self):
     557          self.loop._process_events = mock.Mock()
     558  
     559          def zero_error(fut):
     560              fut.set_result(True)
     561              1/0
     562  
     563          # Test call_soon (events.Handle)
     564          with mock.patch('asyncio.base_events.logger') as log:
     565              fut = self.loop.create_future()
     566              self.loop.call_soon(zero_error, fut)
     567              fut.add_done_callback(lambda fut: self.loop.stop())
     568              self.loop.run_forever()
     569              log.error.assert_called_with(
     570                  test_utils.MockPattern('Exception in callback.*zero'),
     571                  exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
     572  
     573          # Test call_later (events.TimerHandle)
     574          with mock.patch('asyncio.base_events.logger') as log:
     575              fut = self.loop.create_future()
     576              self.loop.call_later(0.01, zero_error, fut)
     577              fut.add_done_callback(lambda fut: self.loop.stop())
     578              self.loop.run_forever()
     579              log.error.assert_called_with(
     580                  test_utils.MockPattern('Exception in callback.*zero'),
     581                  exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
     582  
     583      def test_default_exc_handler_coro(self):
     584          self.loop._process_events = mock.Mock()
     585  
     586          async def zero_error_coro():
     587              await asyncio.sleep(0.01)
     588              1/0
     589  
     590          # Test Future.__del__
     591          with mock.patch('asyncio.base_events.logger') as log:
     592              fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
     593              fut.add_done_callback(lambda *args: self.loop.stop())
     594              self.loop.run_forever()
     595              fut = None # Trigger Future.__del__ or futures._TracebackLogger
     596              support.gc_collect()
     597              # Future.__del__ in logs error with an actual exception context
     598              log.error.assert_called_with(
     599                  test_utils.MockPattern('.*exception was never retrieved'),
     600                  exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
     601  
     602      def test_set_exc_handler_invalid(self):
     603          with self.assertRaisesRegex(TypeError, 'A callable object or None'):
     604              self.loop.set_exception_handler('spam')
     605  
     606      def test_set_exc_handler_custom(self):
     607          def zero_error():
     608              1/0
     609  
     610          def run_loop():
     611              handle = self.loop.call_soon(zero_error)
     612              self.loop._run_once()
     613              return handle
     614  
     615          self.loop.set_debug(True)
     616          self.loop._process_events = mock.Mock()
     617  
     618          self.assertIsNone(self.loop.get_exception_handler())
     619          mock_handler = mock.Mock()
     620          self.loop.set_exception_handler(mock_handler)
     621          self.assertIs(self.loop.get_exception_handler(), mock_handler)
     622          handle = run_loop()
     623          mock_handler.assert_called_with(self.loop, {
     624              'exception': MOCK_ANY,
     625              'message': test_utils.MockPattern(
     626                                  'Exception in callback.*zero_error'),
     627              'handle': handle,
     628              'source_traceback': handle._source_traceback,
     629          })
     630          mock_handler.reset_mock()
     631  
     632          self.loop.set_exception_handler(None)
     633          with mock.patch('asyncio.base_events.logger') as log:
     634              run_loop()
     635              log.error.assert_called_with(
     636                          test_utils.MockPattern(
     637                                  'Exception in callback.*zero'),
     638                          exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
     639  
     640          self.assertFalse(mock_handler.called)
     641  
     642      def test_set_exc_handler_broken(self):
     643          def run_loop():
     644              def zero_error():
     645                  1/0
     646              self.loop.call_soon(zero_error)
     647              self.loop._run_once()
     648  
     649          def handler(loop, context):
     650              raise AttributeError('spam')
     651  
     652          self.loop._process_events = mock.Mock()
     653  
     654          self.loop.set_exception_handler(handler)
     655  
     656          with mock.patch('asyncio.base_events.logger') as log:
     657              run_loop()
     658              log.error.assert_called_with(
     659                  test_utils.MockPattern(
     660                      'Unhandled error in exception handler'),
     661                  exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
     662  
     663      def test_default_exc_handler_broken(self):
     664          _context = None
     665  
     666          class ESC[4;38;5;81mLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
     667  
     668              _selector = mock.Mock()
     669              _process_events = mock.Mock()
     670  
     671              def default_exception_handler(self, context):
     672                  nonlocal _context
     673                  _context = context
     674                  # Simulates custom buggy "default_exception_handler"
     675                  raise ValueError('spam')
     676  
     677          loop = Loop()
     678          self.addCleanup(loop.close)
     679          asyncio.set_event_loop(loop)
     680  
     681          def run_loop():
     682              def zero_error():
     683                  1/0
     684              loop.call_soon(zero_error)
     685              loop._run_once()
     686  
     687          with mock.patch('asyncio.base_events.logger') as log:
     688              run_loop()
     689              log.error.assert_called_with(
     690                  'Exception in default exception handler',
     691                  exc_info=True)
     692  
     693          def custom_handler(loop, context):
     694              raise ValueError('ham')
     695  
     696          _context = None
     697          loop.set_exception_handler(custom_handler)
     698          with mock.patch('asyncio.base_events.logger') as log:
     699              run_loop()
     700              log.error.assert_called_with(
     701                  test_utils.MockPattern('Exception in default exception.*'
     702                                         'while handling.*in custom'),
     703                  exc_info=True)
     704  
     705              # Check that original context was passed to default
     706              # exception handler.
     707              self.assertIn('context', _context)
     708              self.assertIs(type(_context['context']['exception']),
     709                            ZeroDivisionError)
     710  
     711      def test_set_task_factory_invalid(self):
     712          with self.assertRaisesRegex(
     713              TypeError, 'task factory must be a callable or None'):
     714  
     715              self.loop.set_task_factory(1)
     716  
     717          self.assertIsNone(self.loop.get_task_factory())
     718  
     719      def test_set_task_factory(self):
     720          self.loop._process_events = mock.Mock()
     721  
     722          class ESC[4;38;5;81mMyTask(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTask):
     723              pass
     724  
     725          async def coro():
     726              pass
     727  
     728          factory = lambda loop, coro: MyTask(coro, loop=loop)
     729  
     730          self.assertIsNone(self.loop.get_task_factory())
     731          self.loop.set_task_factory(factory)
     732          self.assertIs(self.loop.get_task_factory(), factory)
     733  
     734          task = self.loop.create_task(coro())
     735          self.assertTrue(isinstance(task, MyTask))
     736          self.loop.run_until_complete(task)
     737  
     738          self.loop.set_task_factory(None)
     739          self.assertIsNone(self.loop.get_task_factory())
     740  
     741          task = self.loop.create_task(coro())
     742          self.assertTrue(isinstance(task, asyncio.Task))
     743          self.assertFalse(isinstance(task, MyTask))
     744          self.loop.run_until_complete(task)
     745  
     746      def test_env_var_debug(self):
     747          code = '\n'.join((
     748              'import asyncio',
     749              'loop = asyncio.new_event_loop()',
     750              'print(loop.get_debug())'))
     751  
     752          # Test with -E to not fail if the unit test was run with
     753          # PYTHONASYNCIODEBUG set to a non-empty string
     754          sts, stdout, stderr = assert_python_ok('-E', '-c', code)
     755          self.assertEqual(stdout.rstrip(), b'False')
     756  
     757          sts, stdout, stderr = assert_python_ok('-c', code,
     758                                                 PYTHONASYNCIODEBUG='',
     759                                                 PYTHONDEVMODE='')
     760          self.assertEqual(stdout.rstrip(), b'False')
     761  
     762          sts, stdout, stderr = assert_python_ok('-c', code,
     763                                                 PYTHONASYNCIODEBUG='1',
     764                                                 PYTHONDEVMODE='')
     765          self.assertEqual(stdout.rstrip(), b'True')
     766  
     767          sts, stdout, stderr = assert_python_ok('-E', '-c', code,
     768                                                 PYTHONASYNCIODEBUG='1')
     769          self.assertEqual(stdout.rstrip(), b'False')
     770  
     771          # -X dev
     772          sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
     773                                                 '-c', code)
     774          self.assertEqual(stdout.rstrip(), b'True')
     775  
     776      def test_create_task(self):
     777          class ESC[4;38;5;81mMyTask(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTask):
     778              pass
     779  
     780          async def test():
     781              pass
     782  
     783          class ESC[4;38;5;81mEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
     784              def create_task(self, coro):
     785                  return MyTask(coro, loop=loop)
     786  
     787          loop = EventLoop()
     788          self.set_event_loop(loop)
     789  
     790          coro = test()
     791          task = asyncio.ensure_future(coro, loop=loop)
     792          self.assertIsInstance(task, MyTask)
     793  
     794          # make warnings quiet
     795          task._log_destroy_pending = False
     796          coro.close()
     797  
     798      def test_create_task_error_closes_coro(self):
     799          async def test():
     800              pass
     801          loop = asyncio.new_event_loop()
     802          loop.close()
     803          with warnings.catch_warnings(record=True) as w:
     804              with self.assertRaises(RuntimeError):
     805                  asyncio.ensure_future(test(), loop=loop)
     806              self.assertEqual(len(w), 0)
     807  
     808  
     809      def test_create_named_task_with_default_factory(self):
     810          async def test():
     811              pass
     812  
     813          loop = asyncio.new_event_loop()
     814          task = loop.create_task(test(), name='test_task')
     815          try:
     816              self.assertEqual(task.get_name(), 'test_task')
     817          finally:
     818              loop.run_until_complete(task)
     819              loop.close()
     820  
     821      def test_create_named_task_with_custom_factory(self):
     822          def task_factory(loop, coro):
     823              return asyncio.Task(coro, loop=loop)
     824  
     825          async def test():
     826              pass
     827  
     828          loop = asyncio.new_event_loop()
     829          loop.set_task_factory(task_factory)
     830          task = loop.create_task(test(), name='test_task')
     831          try:
     832              self.assertEqual(task.get_name(), 'test_task')
     833          finally:
     834              loop.run_until_complete(task)
     835              loop.close()
     836  
     837      def test_run_forever_keyboard_interrupt(self):
     838          # Python issue #22601: ensure that the temporary task created by
     839          # run_forever() consumes the KeyboardInterrupt and so don't log
     840          # a warning
     841          async def raise_keyboard_interrupt():
     842              raise KeyboardInterrupt
     843  
     844          self.loop._process_events = mock.Mock()
     845          self.loop.call_exception_handler = mock.Mock()
     846  
     847          try:
     848              self.loop.run_until_complete(raise_keyboard_interrupt())
     849          except KeyboardInterrupt:
     850              pass
     851          self.loop.close()
     852          support.gc_collect()
     853  
     854          self.assertFalse(self.loop.call_exception_handler.called)
     855  
     856      def test_run_until_complete_baseexception(self):
     857          # Python issue #22429: run_until_complete() must not schedule a pending
     858          # call to stop() if the future raised a BaseException
     859          async def raise_keyboard_interrupt():
     860              raise KeyboardInterrupt
     861  
     862          self.loop._process_events = mock.Mock()
     863  
     864          with self.assertRaises(KeyboardInterrupt):
     865              self.loop.run_until_complete(raise_keyboard_interrupt())
     866  
     867          def func():
     868              self.loop.stop()
     869              func.called = True
     870          func.called = False
     871          self.loop.call_soon(self.loop.call_soon, func)
     872          self.loop.run_forever()
     873          self.assertTrue(func.called)
     874  
     875      def test_single_selecter_event_callback_after_stopping(self):
     876          # Python issue #25593: A stopped event loop may cause event callbacks
     877          # to run more than once.
     878          event_sentinel = object()
     879          callcount = 0
     880          doer = None
     881  
     882          def proc_events(event_list):
     883              nonlocal doer
     884              if event_sentinel in event_list:
     885                  doer = self.loop.call_soon(do_event)
     886  
     887          def do_event():
     888              nonlocal callcount
     889              callcount += 1
     890              self.loop.call_soon(clear_selector)
     891  
     892          def clear_selector():
     893              doer.cancel()
     894              self.loop._selector.select.return_value = ()
     895  
     896          self.loop._process_events = proc_events
     897          self.loop._selector.select.return_value = (event_sentinel,)
     898  
     899          for i in range(1, 3):
     900              with self.subTest('Loop %d/2' % i):
     901                  self.loop.call_soon(self.loop.stop)
     902                  self.loop.run_forever()
     903                  self.assertEqual(callcount, 1)
     904  
     905      def test_run_once(self):
     906          # Simple test for test_utils.run_once().  It may seem strange
     907          # to have a test for this (the function isn't even used!) but
     908          # it's a de-factor standard API for library tests.  This tests
     909          # the idiom: loop.call_soon(loop.stop); loop.run_forever().
     910          count = 0
     911  
     912          def callback():
     913              nonlocal count
     914              count += 1
     915  
     916          self.loop._process_events = mock.Mock()
     917          self.loop.call_soon(callback)
     918          test_utils.run_once(self.loop)
     919          self.assertEqual(count, 1)
     920  
     921      def test_run_forever_pre_stopped(self):
     922          # Test that the old idiom for pre-stopping the loop works.
     923          self.loop._process_events = mock.Mock()
     924          self.loop.stop()
     925          self.loop.run_forever()
     926          self.loop._selector.select.assert_called_once_with(0)
     927  
     928      async def leave_unfinalized_asyncgen(self):
     929          # Create an async generator, iterate it partially, and leave it
     930          # to be garbage collected.
     931          # Used in async generator finalization tests.
     932          # Depends on implementation details of garbage collector. Changes
     933          # in gc may break this function.
     934          status = {'started': False,
     935                    'stopped': False,
     936                    'finalized': False}
     937  
     938          async def agen():
     939              status['started'] = True
     940              try:
     941                  for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
     942                      yield item
     943              finally:
     944                  status['finalized'] = True
     945  
     946          ag = agen()
     947          ai = ag.__aiter__()
     948  
     949          async def iter_one():
     950              try:
     951                  item = await ai.__anext__()
     952              except StopAsyncIteration:
     953                  return
     954              if item == 'THREE':
     955                  status['stopped'] = True
     956                  return
     957              asyncio.create_task(iter_one())
     958  
     959          asyncio.create_task(iter_one())
     960          return status
     961  
     962      def test_asyncgen_finalization_by_gc(self):
     963          # Async generators should be finalized when garbage collected.
     964          self.loop._process_events = mock.Mock()
     965          self.loop._write_to_self = mock.Mock()
     966          with support.disable_gc():
     967              status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
     968              while not status['stopped']:
     969                  test_utils.run_briefly(self.loop)
     970              self.assertTrue(status['started'])
     971              self.assertTrue(status['stopped'])
     972              self.assertFalse(status['finalized'])
     973              support.gc_collect()
     974              test_utils.run_briefly(self.loop)
     975              self.assertTrue(status['finalized'])
     976  
     977      def test_asyncgen_finalization_by_gc_in_other_thread(self):
     978          # Python issue 34769: If garbage collector runs in another
     979          # thread, async generators will not finalize in debug
     980          # mode.
     981          self.loop._process_events = mock.Mock()
     982          self.loop._write_to_self = mock.Mock()
     983          self.loop.set_debug(True)
     984          with support.disable_gc():
     985              status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
     986              while not status['stopped']:
     987                  test_utils.run_briefly(self.loop)
     988              self.assertTrue(status['started'])
     989              self.assertTrue(status['stopped'])
     990              self.assertFalse(status['finalized'])
     991              self.loop.run_until_complete(
     992                  self.loop.run_in_executor(None, support.gc_collect))
     993              test_utils.run_briefly(self.loop)
     994              self.assertTrue(status['finalized'])
     995  
     996  
     997  class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     998      done = None
     999  
    1000      def __init__(self, create_future=False):
    1001          self.state = 'INITIAL'
    1002          self.nbytes = 0
    1003          if create_future:
    1004              self.done = asyncio.get_running_loop().create_future()
    1005  
    1006      def _assert_state(self, *expected):
    1007          if self.state not in expected:
    1008              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
    1009  
    1010      def connection_made(self, transport):
    1011          self.transport = transport
    1012          self._assert_state('INITIAL')
    1013          self.state = 'CONNECTED'
    1014          transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
    1015  
    1016      def data_received(self, data):
    1017          self._assert_state('CONNECTED')
    1018          self.nbytes += len(data)
    1019  
    1020      def eof_received(self):
    1021          self._assert_state('CONNECTED')
    1022          self.state = 'EOF'
    1023  
    1024      def connection_lost(self, exc):
    1025          self._assert_state('CONNECTED', 'EOF')
    1026          self.state = 'CLOSED'
    1027          if self.done:
    1028              self.done.set_result(None)
    1029  
    1030  
    1031  class ESC[4;38;5;81mMyDatagramProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDatagramProtocol):
    1032      done = None
    1033  
    1034      def __init__(self, create_future=False, loop=None):
    1035          self.state = 'INITIAL'
    1036          self.nbytes = 0
    1037          if create_future:
    1038              self.done = loop.create_future()
    1039  
    1040      def _assert_state(self, expected):
    1041          if self.state != expected:
    1042              raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
    1043  
    1044      def connection_made(self, transport):
    1045          self.transport = transport
    1046          self._assert_state('INITIAL')
    1047          self.state = 'INITIALIZED'
    1048  
    1049      def datagram_received(self, data, addr):
    1050          self._assert_state('INITIALIZED')
    1051          self.nbytes += len(data)
    1052  
    1053      def error_received(self, exc):
    1054          self._assert_state('INITIALIZED')
    1055  
    1056      def connection_lost(self, exc):
    1057          self._assert_state('INITIALIZED')
    1058          self.state = 'CLOSED'
    1059          if self.done:
    1060              self.done.set_result(None)
    1061  
    1062  
    1063  class ESC[4;38;5;81mBaseEventLoopWithSelectorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1064  
    1065      def setUp(self):
    1066          super().setUp()
    1067          self.loop = asyncio.SelectorEventLoop()
    1068          self.set_event_loop(self.loop)
    1069  
    1070      @mock.patch('socket.getnameinfo')
    1071      def test_getnameinfo(self, m_gai):
    1072          m_gai.side_effect = lambda *args: 42
    1073          r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
    1074          self.assertEqual(r, 42)
    1075  
    1076      @patch_socket
    1077      def test_create_connection_multiple_errors(self, m_socket):
    1078  
    1079          class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
    1080              pass
    1081  
    1082          async def getaddrinfo(*args, **kw):
    1083              return [(2, 1, 6, '', ('107.6.106.82', 80)),
    1084                      (2, 1, 6, '', ('107.6.106.82', 80))]
    1085  
    1086          def getaddrinfo_task(*args, **kwds):
    1087              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1088  
    1089          idx = -1
    1090          errors = ['err1', 'err2']
    1091  
    1092          def _socket(*args, **kw):
    1093              nonlocal idx, errors
    1094              idx += 1
    1095              raise OSError(errors[idx])
    1096  
    1097          m_socket.socket = _socket
    1098  
    1099          self.loop.getaddrinfo = getaddrinfo_task
    1100  
    1101          coro = self.loop.create_connection(MyProto, 'example.com', 80)
    1102          with self.assertRaises(OSError) as cm:
    1103              self.loop.run_until_complete(coro)
    1104  
    1105          self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
    1106  
    1107          idx = -1
    1108          coro = self.loop.create_connection(MyProto, 'example.com', 80, all_errors=True)
    1109          with self.assertRaises(ExceptionGroup) as cm:
    1110              self.loop.run_until_complete(coro)
    1111  
    1112          self.assertIsInstance(cm.exception, ExceptionGroup)
    1113          for e in cm.exception.exceptions:
    1114              self.assertIsInstance(e, OSError)
    1115  
    1116      @patch_socket
    1117      def test_create_connection_timeout(self, m_socket):
    1118          # Ensure that the socket is closed on timeout
    1119          sock = mock.Mock()
    1120          m_socket.socket.return_value = sock
    1121  
    1122          def getaddrinfo(*args, **kw):
    1123              fut = self.loop.create_future()
    1124              addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
    1125                      ('127.0.0.1', 80))
    1126              fut.set_result([addr])
    1127              return fut
    1128          self.loop.getaddrinfo = getaddrinfo
    1129  
    1130          with mock.patch.object(self.loop, 'sock_connect',
    1131                                 side_effect=asyncio.TimeoutError):
    1132              coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
    1133              with self.assertRaises(asyncio.TimeoutError):
    1134                  self.loop.run_until_complete(coro)
    1135              self.assertTrue(sock.close.called)
    1136  
    1137      def test_create_connection_host_port_sock(self):
    1138          coro = self.loop.create_connection(
    1139              MyProto, 'example.com', 80, sock=object())
    1140          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1141  
    1142      def test_create_connection_wrong_sock(self):
    1143          sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    1144          with sock:
    1145              coro = self.loop.create_connection(MyProto, sock=sock)
    1146              with self.assertRaisesRegex(ValueError,
    1147                                          'A Stream Socket was expected'):
    1148                  self.loop.run_until_complete(coro)
    1149  
    1150      def test_create_server_wrong_sock(self):
    1151          sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    1152          with sock:
    1153              coro = self.loop.create_server(MyProto, sock=sock)
    1154              with self.assertRaisesRegex(ValueError,
    1155                                          'A Stream Socket was expected'):
    1156                  self.loop.run_until_complete(coro)
    1157  
    1158      def test_create_server_ssl_timeout_for_plain_socket(self):
    1159          coro = self.loop.create_server(
    1160              MyProto, 'example.com', 80, ssl_handshake_timeout=1)
    1161          with self.assertRaisesRegex(
    1162                  ValueError,
    1163                  'ssl_handshake_timeout is only meaningful with ssl'):
    1164              self.loop.run_until_complete(coro)
    1165  
    1166      @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
    1167                           'no socket.SOCK_NONBLOCK (linux only)')
    1168      def test_create_server_stream_bittype(self):
    1169          sock = socket.socket(
    1170              socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
    1171          with sock:
    1172              coro = self.loop.create_server(lambda: None, sock=sock)
    1173              srv = self.loop.run_until_complete(coro)
    1174              srv.close()
    1175              self.loop.run_until_complete(srv.wait_closed())
    1176  
    1177      @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
    1178      def test_create_server_ipv6(self):
    1179          async def main():
    1180              srv = await asyncio.start_server(lambda: None, '::1', 0)
    1181              try:
    1182                  self.assertGreater(len(srv.sockets), 0)
    1183              finally:
    1184                  srv.close()
    1185                  await srv.wait_closed()
    1186  
    1187          try:
    1188              self.loop.run_until_complete(main())
    1189          except OSError as ex:
    1190              if (hasattr(errno, 'EADDRNOTAVAIL') and
    1191                      ex.errno == errno.EADDRNOTAVAIL):
    1192                  self.skipTest('failed to bind to ::1')
    1193              else:
    1194                  raise
    1195  
    1196      def test_create_datagram_endpoint_wrong_sock(self):
    1197          sock = socket.socket(socket.AF_INET)
    1198          with sock:
    1199              coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
    1200              with self.assertRaisesRegex(ValueError,
    1201                                          'A UDP Socket was expected'):
    1202                  self.loop.run_until_complete(coro)
    1203  
    1204      def test_create_connection_no_host_port_sock(self):
    1205          coro = self.loop.create_connection(MyProto)
    1206          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1207  
    1208      def test_create_connection_no_getaddrinfo(self):
    1209          async def getaddrinfo(*args, **kw):
    1210              return []
    1211  
    1212          def getaddrinfo_task(*args, **kwds):
    1213              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1214  
    1215          self.loop.getaddrinfo = getaddrinfo_task
    1216          coro = self.loop.create_connection(MyProto, 'example.com', 80)
    1217          self.assertRaises(
    1218              OSError, self.loop.run_until_complete, coro)
    1219  
    1220      def test_create_connection_connect_err(self):
    1221          async def getaddrinfo(*args, **kw):
    1222              return [(2, 1, 6, '', ('107.6.106.82', 80))]
    1223  
    1224          def getaddrinfo_task(*args, **kwds):
    1225              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1226  
    1227          self.loop.getaddrinfo = getaddrinfo_task
    1228          self.loop.sock_connect = mock.Mock()
    1229          self.loop.sock_connect.side_effect = OSError
    1230  
    1231          coro = self.loop.create_connection(MyProto, 'example.com', 80)
    1232          self.assertRaises(
    1233              OSError, self.loop.run_until_complete, coro)
    1234  
    1235          coro = self.loop.create_connection(MyProto, 'example.com', 80, all_errors=True)
    1236          with self.assertRaises(ExceptionGroup) as cm:
    1237              self.loop.run_until_complete(coro)
    1238  
    1239          self.assertIsInstance(cm.exception, ExceptionGroup)
    1240          self.assertEqual(len(cm.exception.exceptions), 1)
    1241          self.assertIsInstance(cm.exception.exceptions[0], OSError)
    1242  
    1243      def test_create_connection_multiple(self):
    1244          async def getaddrinfo(*args, **kw):
    1245              return [(2, 1, 6, '', ('0.0.0.1', 80)),
    1246                      (2, 1, 6, '', ('0.0.0.2', 80))]
    1247  
    1248          def getaddrinfo_task(*args, **kwds):
    1249              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1250  
    1251          self.loop.getaddrinfo = getaddrinfo_task
    1252          self.loop.sock_connect = mock.Mock()
    1253          self.loop.sock_connect.side_effect = OSError
    1254  
    1255          coro = self.loop.create_connection(
    1256              MyProto, 'example.com', 80, family=socket.AF_INET)
    1257          with self.assertRaises(OSError):
    1258              self.loop.run_until_complete(coro)
    1259  
    1260          coro = self.loop.create_connection(
    1261              MyProto, 'example.com', 80, family=socket.AF_INET, all_errors=True)
    1262          with self.assertRaises(ExceptionGroup) as cm:
    1263              self.loop.run_until_complete(coro)
    1264  
    1265          self.assertIsInstance(cm.exception, ExceptionGroup)
    1266          for e in cm.exception.exceptions:
    1267              self.assertIsInstance(e, OSError)
    1268  
    1269      @patch_socket
    1270      def test_create_connection_multiple_errors_local_addr(self, m_socket):
    1271  
    1272          def bind(addr):
    1273              if addr[0] == '0.0.0.1':
    1274                  err = OSError('Err')
    1275                  err.strerror = 'Err'
    1276                  raise err
    1277  
    1278          m_socket.socket.return_value.bind = bind
    1279  
    1280          async def getaddrinfo(*args, **kw):
    1281              return [(2, 1, 6, '', ('0.0.0.1', 80)),
    1282                      (2, 1, 6, '', ('0.0.0.2', 80))]
    1283  
    1284          def getaddrinfo_task(*args, **kwds):
    1285              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1286  
    1287          self.loop.getaddrinfo = getaddrinfo_task
    1288          self.loop.sock_connect = mock.Mock()
    1289          self.loop.sock_connect.side_effect = OSError('Err2')
    1290  
    1291          coro = self.loop.create_connection(
    1292              MyProto, 'example.com', 80, family=socket.AF_INET,
    1293              local_addr=(None, 8080))
    1294          with self.assertRaises(OSError) as cm:
    1295              self.loop.run_until_complete(coro)
    1296  
    1297          self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
    1298          self.assertTrue(m_socket.socket.return_value.close.called)
    1299  
    1300          coro = self.loop.create_connection(
    1301              MyProto, 'example.com', 80, family=socket.AF_INET,
    1302              local_addr=(None, 8080), all_errors=True)
    1303          with self.assertRaises(ExceptionGroup) as cm:
    1304              self.loop.run_until_complete(coro)
    1305  
    1306          self.assertIsInstance(cm.exception, ExceptionGroup)
    1307          for e in cm.exception.exceptions:
    1308              self.assertIsInstance(e, OSError)
    1309  
    1310      def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
    1311          # Test the fallback code, even if this system has inet_pton.
    1312          if not allow_inet_pton:
    1313              del m_socket.inet_pton
    1314  
    1315          m_socket.getaddrinfo = socket.getaddrinfo
    1316          sock = m_socket.socket.return_value
    1317  
    1318          self.loop._add_reader = mock.Mock()
    1319          self.loop._add_writer = mock.Mock()
    1320  
    1321          coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
    1322          t, p = self.loop.run_until_complete(coro)
    1323          try:
    1324              sock.connect.assert_called_with(('1.2.3.4', 80))
    1325              _, kwargs = m_socket.socket.call_args
    1326              self.assertEqual(kwargs['family'], m_socket.AF_INET)
    1327              self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
    1328          finally:
    1329              t.close()
    1330              test_utils.run_briefly(self.loop)  # allow transport to close
    1331  
    1332          if socket_helper.IPV6_ENABLED:
    1333              sock.family = socket.AF_INET6
    1334              coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
    1335              t, p = self.loop.run_until_complete(coro)
    1336              try:
    1337                  # Without inet_pton we use getaddrinfo, which transforms
    1338                  # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
    1339                  # scope id.
    1340                  [address] = sock.connect.call_args[0]
    1341                  host, port = address[:2]
    1342                  self.assertRegex(host, r'::(0\.)*1')
    1343                  self.assertEqual(port, 80)
    1344                  _, kwargs = m_socket.socket.call_args
    1345                  self.assertEqual(kwargs['family'], m_socket.AF_INET6)
    1346                  self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
    1347              finally:
    1348                  t.close()
    1349                  test_utils.run_briefly(self.loop)  # allow transport to close
    1350  
    1351      @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
    1352      @unittest.skipIf(sys.platform.startswith('aix'),
    1353                      "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
    1354      @patch_socket
    1355      def test_create_connection_ipv6_scope(self, m_socket):
    1356          m_socket.getaddrinfo = socket.getaddrinfo
    1357          sock = m_socket.socket.return_value
    1358          sock.family = socket.AF_INET6
    1359  
    1360          self.loop._add_reader = mock.Mock()
    1361          self.loop._add_writer = mock.Mock()
    1362  
    1363          coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
    1364          t, p = self.loop.run_until_complete(coro)
    1365          try:
    1366              sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
    1367              _, kwargs = m_socket.socket.call_args
    1368              self.assertEqual(kwargs['family'], m_socket.AF_INET6)
    1369              self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
    1370          finally:
    1371              t.close()
    1372              test_utils.run_briefly(self.loop)  # allow transport to close
    1373  
    1374      @patch_socket
    1375      def test_create_connection_ip_addr(self, m_socket):
    1376          self._test_create_connection_ip_addr(m_socket, True)
    1377  
    1378      @patch_socket
    1379      def test_create_connection_no_inet_pton(self, m_socket):
    1380          self._test_create_connection_ip_addr(m_socket, False)
    1381  
    1382      @patch_socket
    1383      def test_create_connection_service_name(self, m_socket):
    1384          m_socket.getaddrinfo = socket.getaddrinfo
    1385          sock = m_socket.socket.return_value
    1386  
    1387          self.loop._add_reader = mock.Mock()
    1388          self.loop._add_writer = mock.Mock()
    1389  
    1390          for service, port in ('http', 80), (b'http', 80):
    1391              coro = self.loop.create_connection(asyncio.Protocol,
    1392                                                 '127.0.0.1', service)
    1393  
    1394              t, p = self.loop.run_until_complete(coro)
    1395              try:
    1396                  sock.connect.assert_called_with(('127.0.0.1', port))
    1397                  _, kwargs = m_socket.socket.call_args
    1398                  self.assertEqual(kwargs['family'], m_socket.AF_INET)
    1399                  self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
    1400              finally:
    1401                  t.close()
    1402                  test_utils.run_briefly(self.loop)  # allow transport to close
    1403  
    1404          for service in 'nonsense', b'nonsense':
    1405              coro = self.loop.create_connection(asyncio.Protocol,
    1406                                                 '127.0.0.1', service)
    1407  
    1408              with self.assertRaises(OSError):
    1409                  self.loop.run_until_complete(coro)
    1410  
    1411      def test_create_connection_no_local_addr(self):
    1412          async def getaddrinfo(host, *args, **kw):
    1413              if host == 'example.com':
    1414                  return [(2, 1, 6, '', ('107.6.106.82', 80)),
    1415                          (2, 1, 6, '', ('107.6.106.82', 80))]
    1416              else:
    1417                  return []
    1418  
    1419          def getaddrinfo_task(*args, **kwds):
    1420              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1421          self.loop.getaddrinfo = getaddrinfo_task
    1422  
    1423          coro = self.loop.create_connection(
    1424              MyProto, 'example.com', 80, family=socket.AF_INET,
    1425              local_addr=(None, 8080))
    1426          self.assertRaises(
    1427              OSError, self.loop.run_until_complete, coro)
    1428  
    1429      @patch_socket
    1430      def test_create_connection_bluetooth(self, m_socket):
    1431          # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
    1432          # we can't recognize an address is resolved, e.g. a Bluetooth address.
    1433          addr = ('00:01:02:03:04:05', 1)
    1434  
    1435          def getaddrinfo(host, port, *args, **kw):
    1436              self.assertEqual((host, port), addr)
    1437              return [(999, 1, 999, '', (addr, 1))]
    1438  
    1439          m_socket.getaddrinfo = getaddrinfo
    1440          sock = m_socket.socket()
    1441          coro = self.loop.sock_connect(sock, addr)
    1442          self.loop.run_until_complete(coro)
    1443  
    1444      def test_create_connection_ssl_server_hostname_default(self):
    1445          self.loop.getaddrinfo = mock.Mock()
    1446  
    1447          def mock_getaddrinfo(*args, **kwds):
    1448              f = self.loop.create_future()
    1449              f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
    1450                             socket.SOL_TCP, '', ('1.2.3.4', 80))])
    1451              return f
    1452  
    1453          self.loop.getaddrinfo.side_effect = mock_getaddrinfo
    1454          self.loop.sock_connect = mock.Mock()
    1455          self.loop.sock_connect.return_value = self.loop.create_future()
    1456          self.loop.sock_connect.return_value.set_result(None)
    1457          self.loop._make_ssl_transport = mock.Mock()
    1458  
    1459          class ESC[4;38;5;81m_SelectorTransportMock:
    1460              _sock = None
    1461  
    1462              def get_extra_info(self, key):
    1463                  return mock.Mock()
    1464  
    1465              def close(self):
    1466                  self._sock.close()
    1467  
    1468          def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
    1469                                      **kwds):
    1470              waiter.set_result(None)
    1471              transport = _SelectorTransportMock()
    1472              transport._sock = sock
    1473              return transport
    1474  
    1475          self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
    1476          ANY = mock.ANY
    1477          handshake_timeout = object()
    1478          shutdown_timeout = object()
    1479          # First try the default server_hostname.
    1480          self.loop._make_ssl_transport.reset_mock()
    1481          coro = self.loop.create_connection(
    1482                  MyProto, 'python.org', 80, ssl=True,
    1483                  ssl_handshake_timeout=handshake_timeout,
    1484                  ssl_shutdown_timeout=shutdown_timeout)
    1485          transport, _ = self.loop.run_until_complete(coro)
    1486          transport.close()
    1487          self.loop._make_ssl_transport.assert_called_with(
    1488              ANY, ANY, ANY, ANY,
    1489              server_side=False,
    1490              server_hostname='python.org',
    1491              ssl_handshake_timeout=handshake_timeout,
    1492              ssl_shutdown_timeout=shutdown_timeout)
    1493          # Next try an explicit server_hostname.
    1494          self.loop._make_ssl_transport.reset_mock()
    1495          coro = self.loop.create_connection(
    1496                  MyProto, 'python.org', 80, ssl=True,
    1497                  server_hostname='perl.com',
    1498                  ssl_handshake_timeout=handshake_timeout,
    1499                  ssl_shutdown_timeout=shutdown_timeout)
    1500          transport, _ = self.loop.run_until_complete(coro)
    1501          transport.close()
    1502          self.loop._make_ssl_transport.assert_called_with(
    1503              ANY, ANY, ANY, ANY,
    1504              server_side=False,
    1505              server_hostname='perl.com',
    1506              ssl_handshake_timeout=handshake_timeout,
    1507              ssl_shutdown_timeout=shutdown_timeout)
    1508          # Finally try an explicit empty server_hostname.
    1509          self.loop._make_ssl_transport.reset_mock()
    1510          coro = self.loop.create_connection(
    1511                  MyProto, 'python.org', 80, ssl=True,
    1512                  server_hostname='',
    1513                  ssl_handshake_timeout=handshake_timeout,
    1514                  ssl_shutdown_timeout=shutdown_timeout)
    1515          transport, _ = self.loop.run_until_complete(coro)
    1516          transport.close()
    1517          self.loop._make_ssl_transport.assert_called_with(
    1518                  ANY, ANY, ANY, ANY,
    1519                  server_side=False,
    1520                  server_hostname='',
    1521                  ssl_handshake_timeout=handshake_timeout,
    1522                  ssl_shutdown_timeout=shutdown_timeout)
    1523  
    1524      def test_create_connection_no_ssl_server_hostname_errors(self):
    1525          # When not using ssl, server_hostname must be None.
    1526          coro = self.loop.create_connection(MyProto, 'python.org', 80,
    1527                                             server_hostname='')
    1528          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1529          coro = self.loop.create_connection(MyProto, 'python.org', 80,
    1530                                             server_hostname='python.org')
    1531          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1532  
    1533      def test_create_connection_ssl_server_hostname_errors(self):
    1534          # When using ssl, server_hostname may be None if host is non-empty.
    1535          coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
    1536          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1537          coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
    1538          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1539          sock = socket.socket()
    1540          coro = self.loop.create_connection(MyProto, None, None,
    1541                                             ssl=True, sock=sock)
    1542          self.addCleanup(sock.close)
    1543          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1544  
    1545      def test_create_connection_ssl_timeout_for_plain_socket(self):
    1546          coro = self.loop.create_connection(
    1547              MyProto, 'example.com', 80, ssl_handshake_timeout=1)
    1548          with self.assertRaisesRegex(
    1549                  ValueError,
    1550                  'ssl_handshake_timeout is only meaningful with ssl'):
    1551              self.loop.run_until_complete(coro)
    1552  
    1553      def test_create_server_empty_host(self):
    1554          # if host is empty string use None instead
    1555          host = object()
    1556  
    1557          async def getaddrinfo(*args, **kw):
    1558              nonlocal host
    1559              host = args[0]
    1560              return []
    1561  
    1562          def getaddrinfo_task(*args, **kwds):
    1563              return self.loop.create_task(getaddrinfo(*args, **kwds))
    1564  
    1565          self.loop.getaddrinfo = getaddrinfo_task
    1566          fut = self.loop.create_server(MyProto, '', 0)
    1567          self.assertRaises(OSError, self.loop.run_until_complete, fut)
    1568          self.assertIsNone(host)
    1569  
    1570      def test_create_server_host_port_sock(self):
    1571          fut = self.loop.create_server(
    1572              MyProto, '0.0.0.0', 0, sock=object())
    1573          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1574  
    1575      def test_create_server_no_host_port_sock(self):
    1576          fut = self.loop.create_server(MyProto)
    1577          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1578  
    1579      def test_create_server_no_getaddrinfo(self):
    1580          getaddrinfo = self.loop.getaddrinfo = mock.Mock()
    1581          getaddrinfo.return_value = self.loop.create_future()
    1582          getaddrinfo.return_value.set_result(None)
    1583  
    1584          f = self.loop.create_server(MyProto, 'python.org', 0)
    1585          self.assertRaises(OSError, self.loop.run_until_complete, f)
    1586  
    1587      @patch_socket
    1588      def test_create_server_nosoreuseport(self, m_socket):
    1589          m_socket.getaddrinfo = socket.getaddrinfo
    1590          del m_socket.SO_REUSEPORT
    1591          m_socket.socket.return_value = mock.Mock()
    1592  
    1593          f = self.loop.create_server(
    1594              MyProto, '0.0.0.0', 0, reuse_port=True)
    1595  
    1596          self.assertRaises(ValueError, self.loop.run_until_complete, f)
    1597  
    1598      @patch_socket
    1599      def test_create_server_soreuseport_only_defined(self, m_socket):
    1600          m_socket.getaddrinfo = socket.getaddrinfo
    1601          m_socket.socket.return_value = mock.Mock()
    1602          m_socket.SO_REUSEPORT = -1
    1603  
    1604          f = self.loop.create_server(
    1605              MyProto, '0.0.0.0', 0, reuse_port=True)
    1606  
    1607          self.assertRaises(ValueError, self.loop.run_until_complete, f)
    1608  
    1609      @patch_socket
    1610      def test_create_server_cant_bind(self, m_socket):
    1611  
    1612          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
    1613              strerror = 'error'
    1614  
    1615          m_socket.getaddrinfo.return_value = [
    1616              (2, 1, 6, '', ('127.0.0.1', 10100))]
    1617          m_sock = m_socket.socket.return_value = mock.Mock()
    1618          m_sock.bind.side_effect = Err
    1619  
    1620          fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
    1621          self.assertRaises(OSError, self.loop.run_until_complete, fut)
    1622          self.assertTrue(m_sock.close.called)
    1623  
    1624      @patch_socket
    1625      def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
    1626          m_socket.getaddrinfo.return_value = []
    1627  
    1628          coro = self.loop.create_datagram_endpoint(
    1629              MyDatagramProto, local_addr=('localhost', 0))
    1630          self.assertRaises(
    1631              OSError, self.loop.run_until_complete, coro)
    1632  
    1633      def test_create_datagram_endpoint_addr_error(self):
    1634          coro = self.loop.create_datagram_endpoint(
    1635              MyDatagramProto, local_addr='localhost')
    1636          self.assertRaises(
    1637              TypeError, self.loop.run_until_complete, coro)
    1638          coro = self.loop.create_datagram_endpoint(
    1639              MyDatagramProto, local_addr=('localhost', 1, 2, 3))
    1640          self.assertRaises(
    1641              TypeError, self.loop.run_until_complete, coro)
    1642  
    1643      def test_create_datagram_endpoint_connect_err(self):
    1644          self.loop.sock_connect = mock.Mock()
    1645          self.loop.sock_connect.side_effect = OSError
    1646  
    1647          coro = self.loop.create_datagram_endpoint(
    1648              asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
    1649          self.assertRaises(
    1650              OSError, self.loop.run_until_complete, coro)
    1651  
    1652      def test_create_datagram_endpoint_allow_broadcast(self):
    1653          protocol = MyDatagramProto(create_future=True, loop=self.loop)
    1654          self.loop.sock_connect = sock_connect = mock.Mock()
    1655          sock_connect.return_value = []
    1656  
    1657          coro = self.loop.create_datagram_endpoint(
    1658              lambda: protocol,
    1659              remote_addr=('127.0.0.1', 0),
    1660              allow_broadcast=True)
    1661  
    1662          transport, _ = self.loop.run_until_complete(coro)
    1663          self.assertFalse(sock_connect.called)
    1664  
    1665          transport.close()
    1666          self.loop.run_until_complete(protocol.done)
    1667          self.assertEqual('CLOSED', protocol.state)
    1668  
    1669      @patch_socket
    1670      def test_create_datagram_endpoint_socket_err(self, m_socket):
    1671          m_socket.getaddrinfo = socket.getaddrinfo
    1672          m_socket.socket.side_effect = OSError
    1673  
    1674          coro = self.loop.create_datagram_endpoint(
    1675              asyncio.DatagramProtocol, family=socket.AF_INET)
    1676          self.assertRaises(
    1677              OSError, self.loop.run_until_complete, coro)
    1678  
    1679          coro = self.loop.create_datagram_endpoint(
    1680              asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
    1681          self.assertRaises(
    1682              OSError, self.loop.run_until_complete, coro)
    1683  
    1684      @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
    1685      def test_create_datagram_endpoint_no_matching_family(self):
    1686          coro = self.loop.create_datagram_endpoint(
    1687              asyncio.DatagramProtocol,
    1688              remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
    1689          self.assertRaises(
    1690              ValueError, self.loop.run_until_complete, coro)
    1691  
    1692      @patch_socket
    1693      def test_create_datagram_endpoint_setblk_err(self, m_socket):
    1694          m_socket.socket.return_value.setblocking.side_effect = OSError
    1695  
    1696          coro = self.loop.create_datagram_endpoint(
    1697              asyncio.DatagramProtocol, family=socket.AF_INET)
    1698          self.assertRaises(
    1699              OSError, self.loop.run_until_complete, coro)
    1700          self.assertTrue(
    1701              m_socket.socket.return_value.close.called)
    1702  
    1703      def test_create_datagram_endpoint_noaddr_nofamily(self):
    1704          coro = self.loop.create_datagram_endpoint(
    1705              asyncio.DatagramProtocol)
    1706          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1707  
    1708      @patch_socket
    1709      def test_create_datagram_endpoint_cant_bind(self, m_socket):
    1710          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
    1711              pass
    1712  
    1713          m_socket.getaddrinfo = socket.getaddrinfo
    1714          m_sock = m_socket.socket.return_value = mock.Mock()
    1715          m_sock.bind.side_effect = Err
    1716  
    1717          fut = self.loop.create_datagram_endpoint(
    1718              MyDatagramProto,
    1719              local_addr=('127.0.0.1', 0), family=socket.AF_INET)
    1720          self.assertRaises(Err, self.loop.run_until_complete, fut)
    1721          self.assertTrue(m_sock.close.called)
    1722  
    1723      def test_create_datagram_endpoint_sock(self):
    1724          sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    1725          sock.bind(('127.0.0.1', 0))
    1726          fut = self.loop.create_datagram_endpoint(
    1727              lambda: MyDatagramProto(create_future=True, loop=self.loop),
    1728              sock=sock)
    1729          transport, protocol = self.loop.run_until_complete(fut)
    1730          transport.close()
    1731          self.loop.run_until_complete(protocol.done)
    1732          self.assertEqual('CLOSED', protocol.state)
    1733  
    1734      @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    1735      def test_create_datagram_endpoint_sock_unix(self):
    1736          fut = self.loop.create_datagram_endpoint(
    1737              lambda: MyDatagramProto(create_future=True, loop=self.loop),
    1738              family=socket.AF_UNIX)
    1739          transport, protocol = self.loop.run_until_complete(fut)
    1740          self.assertEqual(transport._sock.family, socket.AF_UNIX)
    1741          transport.close()
    1742          self.loop.run_until_complete(protocol.done)
    1743          self.assertEqual('CLOSED', protocol.state)
    1744  
    1745      @socket_helper.skip_unless_bind_unix_socket
    1746      def test_create_datagram_endpoint_existing_sock_unix(self):
    1747          with test_utils.unix_socket_path() as path:
    1748              sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
    1749              sock.bind(path)
    1750              sock.close()
    1751  
    1752              coro = self.loop.create_datagram_endpoint(
    1753                  lambda: MyDatagramProto(create_future=True, loop=self.loop),
    1754                  path, family=socket.AF_UNIX)
    1755              transport, protocol = self.loop.run_until_complete(coro)
    1756              transport.close()
    1757              self.loop.run_until_complete(protocol.done)
    1758  
    1759      def test_create_datagram_endpoint_sock_sockopts(self):
    1760          class ESC[4;38;5;81mFakeSock:
    1761              type = socket.SOCK_DGRAM
    1762  
    1763          fut = self.loop.create_datagram_endpoint(
    1764              MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
    1765          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1766  
    1767          fut = self.loop.create_datagram_endpoint(
    1768              MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
    1769          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1770  
    1771          fut = self.loop.create_datagram_endpoint(
    1772              MyDatagramProto, family=1, sock=FakeSock())
    1773          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1774  
    1775          fut = self.loop.create_datagram_endpoint(
    1776              MyDatagramProto, proto=1, sock=FakeSock())
    1777          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1778  
    1779          fut = self.loop.create_datagram_endpoint(
    1780              MyDatagramProto, flags=1, sock=FakeSock())
    1781          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1782  
    1783          fut = self.loop.create_datagram_endpoint(
    1784              MyDatagramProto, reuse_port=True, sock=FakeSock())
    1785          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1786  
    1787          fut = self.loop.create_datagram_endpoint(
    1788              MyDatagramProto, allow_broadcast=True, sock=FakeSock())
    1789          self.assertRaises(ValueError, self.loop.run_until_complete, fut)
    1790  
    1791      @unittest.skipIf(sys.platform == 'vxworks',
    1792                      "SO_BROADCAST is enabled by default on VxWorks")
    1793      def test_create_datagram_endpoint_sockopts(self):
    1794          # Socket options should not be applied unless asked for.
    1795          # SO_REUSEPORT is not available on all platforms.
    1796  
    1797          coro = self.loop.create_datagram_endpoint(
    1798              lambda: MyDatagramProto(create_future=True, loop=self.loop),
    1799              local_addr=('127.0.0.1', 0))
    1800          transport, protocol = self.loop.run_until_complete(coro)
    1801          sock = transport.get_extra_info('socket')
    1802  
    1803          reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
    1804  
    1805          if reuseport_supported:
    1806              self.assertFalse(
    1807                  sock.getsockopt(
    1808                      socket.SOL_SOCKET, socket.SO_REUSEPORT))
    1809          self.assertFalse(
    1810              sock.getsockopt(
    1811                  socket.SOL_SOCKET, socket.SO_BROADCAST))
    1812  
    1813          transport.close()
    1814          self.loop.run_until_complete(protocol.done)
    1815          self.assertEqual('CLOSED', protocol.state)
    1816  
    1817          coro = self.loop.create_datagram_endpoint(
    1818              lambda: MyDatagramProto(create_future=True, loop=self.loop),
    1819              local_addr=('127.0.0.1', 0),
    1820              reuse_port=reuseport_supported,
    1821              allow_broadcast=True)
    1822          transport, protocol = self.loop.run_until_complete(coro)
    1823          sock = transport.get_extra_info('socket')
    1824  
    1825          self.assertFalse(
    1826              sock.getsockopt(
    1827                  socket.SOL_SOCKET, socket.SO_REUSEADDR))
    1828          if reuseport_supported:
    1829              self.assertTrue(
    1830                  sock.getsockopt(
    1831                      socket.SOL_SOCKET, socket.SO_REUSEPORT))
    1832          self.assertTrue(
    1833              sock.getsockopt(
    1834                  socket.SOL_SOCKET, socket.SO_BROADCAST))
    1835  
    1836          transport.close()
    1837          self.loop.run_until_complete(protocol.done)
    1838          self.assertEqual('CLOSED', protocol.state)
    1839  
    1840      @patch_socket
    1841      def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
    1842          del m_socket.SO_REUSEPORT
    1843          m_socket.socket.return_value = mock.Mock()
    1844  
    1845          coro = self.loop.create_datagram_endpoint(
    1846              lambda: MyDatagramProto(loop=self.loop),
    1847              local_addr=('127.0.0.1', 0),
    1848              reuse_port=True)
    1849  
    1850          self.assertRaises(ValueError, self.loop.run_until_complete, coro)
    1851  
    1852      @patch_socket
    1853      def test_create_datagram_endpoint_ip_addr(self, m_socket):
    1854          def getaddrinfo(*args, **kw):
    1855              self.fail('should not have called getaddrinfo')
    1856  
    1857          m_socket.getaddrinfo = getaddrinfo
    1858          m_socket.socket.return_value.bind = bind = mock.Mock()
    1859          self.loop._add_reader = mock.Mock()
    1860  
    1861          reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
    1862          coro = self.loop.create_datagram_endpoint(
    1863              lambda: MyDatagramProto(loop=self.loop),
    1864              local_addr=('1.2.3.4', 0),
    1865              reuse_port=reuseport_supported)
    1866  
    1867          t, p = self.loop.run_until_complete(coro)
    1868          try:
    1869              bind.assert_called_with(('1.2.3.4', 0))
    1870              m_socket.socket.assert_called_with(family=m_socket.AF_INET,
    1871                                                 proto=m_socket.IPPROTO_UDP,
    1872                                                 type=m_socket.SOCK_DGRAM)
    1873          finally:
    1874              t.close()
    1875              test_utils.run_briefly(self.loop)  # allow transport to close
    1876  
    1877      def test_accept_connection_retry(self):
    1878          sock = mock.Mock()
    1879          sock.accept.side_effect = BlockingIOError()
    1880  
    1881          self.loop._accept_connection(MyProto, sock)
    1882          self.assertFalse(sock.close.called)
    1883  
    1884      @mock.patch('asyncio.base_events.logger')
    1885      def test_accept_connection_exception(self, m_log):
    1886          sock = mock.Mock()
    1887          sock.fileno.return_value = 10
    1888          sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
    1889          self.loop._remove_reader = mock.Mock()
    1890          self.loop.call_later = mock.Mock()
    1891  
    1892          self.loop._accept_connection(MyProto, sock)
    1893          self.assertTrue(m_log.error.called)
    1894          self.assertFalse(sock.close.called)
    1895          self.loop._remove_reader.assert_called_with(10)
    1896          self.loop.call_later.assert_called_with(
    1897              constants.ACCEPT_RETRY_DELAY,
    1898              # self.loop._start_serving
    1899              mock.ANY,
    1900              MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY)
    1901  
    1902      def test_call_coroutine(self):
    1903          async def simple_coroutine():
    1904              pass
    1905  
    1906          self.loop.set_debug(True)
    1907          coro_func = simple_coroutine
    1908          coro_obj = coro_func()
    1909          self.addCleanup(coro_obj.close)
    1910          for func in (coro_func, coro_obj):
    1911              with self.assertRaises(TypeError):
    1912                  self.loop.call_soon(func)
    1913              with self.assertRaises(TypeError):
    1914                  self.loop.call_soon_threadsafe(func)
    1915              with self.assertRaises(TypeError):
    1916                  self.loop.call_later(60, func)
    1917              with self.assertRaises(TypeError):
    1918                  self.loop.call_at(self.loop.time() + 60, func)
    1919              with self.assertRaises(TypeError):
    1920                  self.loop.run_until_complete(
    1921                      self.loop.run_in_executor(None, func))
    1922  
    1923      @mock.patch('asyncio.base_events.logger')
    1924      def test_log_slow_callbacks(self, m_logger):
    1925          def stop_loop_cb(loop):
    1926              loop.stop()
    1927  
    1928          async def stop_loop_coro(loop):
    1929              loop.stop()
    1930  
    1931          asyncio.set_event_loop(self.loop)
    1932          self.loop.set_debug(True)
    1933          self.loop.slow_callback_duration = 0.0
    1934  
    1935          # slow callback
    1936          self.loop.call_soon(stop_loop_cb, self.loop)
    1937          self.loop.run_forever()
    1938          fmt, *args = m_logger.warning.call_args[0]
    1939          self.assertRegex(fmt % tuple(args),
    1940                           "^Executing <Handle.*stop_loop_cb.*> "
    1941                           "took .* seconds$")
    1942  
    1943          # slow task
    1944          asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
    1945          self.loop.run_forever()
    1946          fmt, *args = m_logger.warning.call_args[0]
    1947          self.assertRegex(fmt % tuple(args),
    1948                           "^Executing <Task.*stop_loop_coro.*> "
    1949                           "took .* seconds$")
    1950  
    1951  
    1952  class ESC[4;38;5;81mRunningLoopTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1953  
    1954      def test_running_loop_within_a_loop(self):
    1955          async def runner(loop):
    1956              loop.run_forever()
    1957  
    1958          loop = asyncio.new_event_loop()
    1959          outer_loop = asyncio.new_event_loop()
    1960          try:
    1961              with self.assertRaisesRegex(RuntimeError,
    1962                                          'while another loop is running'):
    1963                  outer_loop.run_until_complete(runner(loop))
    1964          finally:
    1965              loop.close()
    1966              outer_loop.close()
    1967  
    1968  
    1969  class ESC[4;38;5;81mBaseLoopSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1970  
    1971      DATA = b"12345abcde" * 16 * 1024  # 160 KiB
    1972  
    1973      class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
    1974  
    1975          def __init__(self, loop):
    1976              self.started = False
    1977              self.closed = False
    1978              self.data = bytearray()
    1979              self.fut = loop.create_future()
    1980              self.transport = None
    1981  
    1982          def connection_made(self, transport):
    1983              self.started = True
    1984              self.transport = transport
    1985  
    1986          def data_received(self, data):
    1987              self.data.extend(data)
    1988  
    1989          def connection_lost(self, exc):
    1990              self.closed = True
    1991              self.fut.set_result(None)
    1992              self.transport = None
    1993  
    1994          async def wait_closed(self):
    1995              await self.fut
    1996  
    1997      @classmethod
    1998      def setUpClass(cls):
    1999          cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
    2000          constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
    2001          with open(os_helper.TESTFN, 'wb') as fp:
    2002              fp.write(cls.DATA)
    2003          super().setUpClass()
    2004  
    2005      @classmethod
    2006      def tearDownClass(cls):
    2007          constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
    2008          os_helper.unlink(os_helper.TESTFN)
    2009          super().tearDownClass()
    2010  
    2011      def setUp(self):
    2012          from asyncio.selector_events import BaseSelectorEventLoop
    2013          # BaseSelectorEventLoop() has no native implementation
    2014          self.loop = BaseSelectorEventLoop()
    2015          self.set_event_loop(self.loop)
    2016          self.file = open(os_helper.TESTFN, 'rb')
    2017          self.addCleanup(self.file.close)
    2018          super().setUp()
    2019  
    2020      def make_socket(self, blocking=False):
    2021          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    2022          sock.setblocking(blocking)
    2023          self.addCleanup(sock.close)
    2024          return sock
    2025  
    2026      def run_loop(self, coro):
    2027          return self.loop.run_until_complete(coro)
    2028  
    2029      def prepare(self):
    2030          sock = self.make_socket()
    2031          proto = self.MyProto(self.loop)
    2032          server = self.run_loop(self.loop.create_server(
    2033              lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
    2034          addr = server.sockets[0].getsockname()
    2035  
    2036          for _ in range(10):
    2037              try:
    2038                  self.run_loop(self.loop.sock_connect(sock, addr))
    2039              except OSError:
    2040                  self.run_loop(asyncio.sleep(0.5))
    2041                  continue
    2042              else:
    2043                  break
    2044          else:
    2045              # One last try, so we get the exception
    2046              self.run_loop(self.loop.sock_connect(sock, addr))
    2047  
    2048          def cleanup():
    2049              server.close()
    2050              sock.close()
    2051              if proto.transport is not None:
    2052                  proto.transport.close()
    2053                  self.run_loop(proto.wait_closed())
    2054              self.run_loop(server.wait_closed())
    2055  
    2056          self.addCleanup(cleanup)
    2057  
    2058          return sock, proto
    2059  
    2060      def test__sock_sendfile_native_failure(self):
    2061          sock, proto = self.prepare()
    2062  
    2063          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
    2064                                      "sendfile is not available"):
    2065              self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
    2066                                                            0, None))
    2067  
    2068          self.assertEqual(proto.data, b'')
    2069          self.assertEqual(self.file.tell(), 0)
    2070  
    2071      def test_sock_sendfile_no_fallback(self):
    2072          sock, proto = self.prepare()
    2073  
    2074          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
    2075                                      "sendfile is not available"):
    2076              self.run_loop(self.loop.sock_sendfile(sock, self.file,
    2077                                                    fallback=False))
    2078  
    2079          self.assertEqual(self.file.tell(), 0)
    2080          self.assertEqual(proto.data, b'')
    2081  
    2082      def test_sock_sendfile_fallback(self):
    2083          sock, proto = self.prepare()
    2084  
    2085          ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
    2086          sock.close()
    2087          self.run_loop(proto.wait_closed())
    2088  
    2089          self.assertEqual(ret, len(self.DATA))
    2090          self.assertEqual(self.file.tell(), len(self.DATA))
    2091          self.assertEqual(proto.data, self.DATA)
    2092  
    2093      def test_sock_sendfile_fallback_offset_and_count(self):
    2094          sock, proto = self.prepare()
    2095  
    2096          ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
    2097                                                      1000, 2000))
    2098          sock.close()
    2099          self.run_loop(proto.wait_closed())
    2100  
    2101          self.assertEqual(ret, 2000)
    2102          self.assertEqual(self.file.tell(), 3000)
    2103          self.assertEqual(proto.data, self.DATA[1000:3000])
    2104  
    2105      def test_blocking_socket(self):
    2106          self.loop.set_debug(True)
    2107          sock = self.make_socket(blocking=True)
    2108          with self.assertRaisesRegex(ValueError, "must be non-blocking"):
    2109              self.run_loop(self.loop.sock_sendfile(sock, self.file))
    2110  
    2111      def test_nonbinary_file(self):
    2112          sock = self.make_socket()
    2113          with open(os_helper.TESTFN, encoding="utf-8") as f:
    2114              with self.assertRaisesRegex(ValueError, "binary mode"):
    2115                  self.run_loop(self.loop.sock_sendfile(sock, f))
    2116  
    2117      def test_nonstream_socket(self):
    2118          sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    2119          sock.setblocking(False)
    2120          self.addCleanup(sock.close)
    2121          with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
    2122              self.run_loop(self.loop.sock_sendfile(sock, self.file))
    2123  
    2124      def test_notint_count(self):
    2125          sock = self.make_socket()
    2126          with self.assertRaisesRegex(TypeError,
    2127                                      "count must be a positive integer"):
    2128              self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
    2129  
    2130      def test_negative_count(self):
    2131          sock = self.make_socket()
    2132          with self.assertRaisesRegex(ValueError,
    2133                                      "count must be a positive integer"):
    2134              self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
    2135  
    2136      def test_notint_offset(self):
    2137          sock = self.make_socket()
    2138          with self.assertRaisesRegex(TypeError,
    2139                                      "offset must be a non-negative integer"):
    2140              self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
    2141  
    2142      def test_negative_offset(self):
    2143          sock = self.make_socket()
    2144          with self.assertRaisesRegex(ValueError,
    2145                                      "offset must be a non-negative integer"):
    2146              self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
    2147  
    2148  
    2149  class ESC[4;38;5;81mTestSelectorUtils(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2150      def check_set_nodelay(self, sock):
    2151          opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
    2152          self.assertFalse(opt)
    2153  
    2154          base_events._set_nodelay(sock)
    2155  
    2156          opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
    2157          self.assertTrue(opt)
    2158  
    2159      @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
    2160                           'need socket.TCP_NODELAY')
    2161      def test_set_nodelay(self):
    2162          sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
    2163                               proto=socket.IPPROTO_TCP)
    2164          with sock:
    2165              self.check_set_nodelay(sock)
    2166  
    2167          sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
    2168                               proto=socket.IPPROTO_TCP)
    2169          with sock:
    2170              sock.setblocking(False)
    2171              self.check_set_nodelay(sock)
    2172  
    2173  
    2174  
    2175  if __name__ == '__main__':
    2176      unittest.main()