python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_unix_events.py
       1  """Tests for unix_events.py."""
       2  
       3  import contextlib
       4  import errno
       5  import io
       6  import multiprocessing
       7  import os
       8  import pathlib
       9  import signal
      10  import socket
      11  import stat
      12  import sys
      13  import threading
      14  import unittest
      15  from unittest import mock
      16  import warnings
      17  from test.support import os_helper
      18  from test.support import socket_helper
      19  from test.support import wait_process
      20  from test.support import hashlib_helper
      21  
      22  if sys.platform == 'win32':
      23      raise unittest.SkipTest('UNIX only')
      24  
      25  
      26  import asyncio
      27  from asyncio import log
      28  from asyncio import unix_events
      29  from test.test_asyncio import utils as test_utils
      30  
      31  
      32  def tearDownModule():
      33      asyncio.set_event_loop_policy(None)
      34  
      35  
      36  MOCK_ANY = mock.ANY
      37  
      38  
      39  def EXITCODE(exitcode):
      40      return 32768 + exitcode
      41  
      42  
      43  def SIGNAL(signum):
      44      if not 1 <= signum <= 68:
      45          raise AssertionError(f'invalid signum {signum}')
      46      return 32768 - signum
      47  
      48  
      49  def close_pipe_transport(transport):
      50      # Don't call transport.close() because the event loop and the selector
      51      # are mocked
      52      if transport._pipe is None:
      53          return
      54      transport._pipe.close()
      55      transport._pipe = None
      56  
      57  
      58  @unittest.skipUnless(signal, 'Signals are not supported')
      59  class ESC[4;38;5;81mSelectorEventLoopSignalTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      60  
      61      def setUp(self):
      62          super().setUp()
      63          self.loop = asyncio.SelectorEventLoop()
      64          self.set_event_loop(self.loop)
      65  
      66      def test_check_signal(self):
      67          self.assertRaises(
      68              TypeError, self.loop._check_signal, '1')
      69          self.assertRaises(
      70              ValueError, self.loop._check_signal, signal.NSIG + 1)
      71  
      72      def test_handle_signal_no_handler(self):
      73          self.loop._handle_signal(signal.NSIG + 1)
      74  
      75      def test_handle_signal_cancelled_handler(self):
      76          h = asyncio.Handle(mock.Mock(), (),
      77                             loop=mock.Mock())
      78          h.cancel()
      79          self.loop._signal_handlers[signal.NSIG + 1] = h
      80          self.loop.remove_signal_handler = mock.Mock()
      81          self.loop._handle_signal(signal.NSIG + 1)
      82          self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
      83  
      84      @mock.patch('asyncio.unix_events.signal')
      85      def test_add_signal_handler_setup_error(self, m_signal):
      86          m_signal.NSIG = signal.NSIG
      87          m_signal.valid_signals = signal.valid_signals
      88          m_signal.set_wakeup_fd.side_effect = ValueError
      89  
      90          self.assertRaises(
      91              RuntimeError,
      92              self.loop.add_signal_handler,
      93              signal.SIGINT, lambda: True)
      94  
      95      @mock.patch('asyncio.unix_events.signal')
      96      def test_add_signal_handler_coroutine_error(self, m_signal):
      97          m_signal.NSIG = signal.NSIG
      98  
      99          async def simple_coroutine():
     100              pass
     101  
     102          # callback must not be a coroutine function
     103          coro_func = simple_coroutine
     104          coro_obj = coro_func()
     105          self.addCleanup(coro_obj.close)
     106          for func in (coro_func, coro_obj):
     107              self.assertRaisesRegex(
     108                  TypeError, 'coroutines cannot be used with add_signal_handler',
     109                  self.loop.add_signal_handler,
     110                  signal.SIGINT, func)
     111  
     112      @mock.patch('asyncio.unix_events.signal')
     113      def test_add_signal_handler(self, m_signal):
     114          m_signal.NSIG = signal.NSIG
     115          m_signal.valid_signals = signal.valid_signals
     116  
     117          cb = lambda: True
     118          self.loop.add_signal_handler(signal.SIGHUP, cb)
     119          h = self.loop._signal_handlers.get(signal.SIGHUP)
     120          self.assertIsInstance(h, asyncio.Handle)
     121          self.assertEqual(h._callback, cb)
     122  
     123      @mock.patch('asyncio.unix_events.signal')
     124      def test_add_signal_handler_install_error(self, m_signal):
     125          m_signal.NSIG = signal.NSIG
     126          m_signal.valid_signals = signal.valid_signals
     127  
     128          def set_wakeup_fd(fd):
     129              if fd == -1:
     130                  raise ValueError()
     131          m_signal.set_wakeup_fd = set_wakeup_fd
     132  
     133          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     134              errno = errno.EFAULT
     135          m_signal.signal.side_effect = Err
     136  
     137          self.assertRaises(
     138              Err,
     139              self.loop.add_signal_handler,
     140              signal.SIGINT, lambda: True)
     141  
     142      @mock.patch('asyncio.unix_events.signal')
     143      @mock.patch('asyncio.base_events.logger')
     144      def test_add_signal_handler_install_error2(self, m_logging, m_signal):
     145          m_signal.NSIG = signal.NSIG
     146          m_signal.valid_signals = signal.valid_signals
     147  
     148          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     149              errno = errno.EINVAL
     150          m_signal.signal.side_effect = Err
     151  
     152          self.loop._signal_handlers[signal.SIGHUP] = lambda: True
     153          self.assertRaises(
     154              RuntimeError,
     155              self.loop.add_signal_handler,
     156              signal.SIGINT, lambda: True)
     157          self.assertFalse(m_logging.info.called)
     158          self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
     159  
     160      @mock.patch('asyncio.unix_events.signal')
     161      @mock.patch('asyncio.base_events.logger')
     162      def test_add_signal_handler_install_error3(self, m_logging, m_signal):
     163          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     164              errno = errno.EINVAL
     165          m_signal.signal.side_effect = Err
     166          m_signal.NSIG = signal.NSIG
     167          m_signal.valid_signals = signal.valid_signals
     168  
     169          self.assertRaises(
     170              RuntimeError,
     171              self.loop.add_signal_handler,
     172              signal.SIGINT, lambda: True)
     173          self.assertFalse(m_logging.info.called)
     174          self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
     175  
     176      @mock.patch('asyncio.unix_events.signal')
     177      def test_remove_signal_handler(self, m_signal):
     178          m_signal.NSIG = signal.NSIG
     179          m_signal.valid_signals = signal.valid_signals
     180  
     181          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     182  
     183          self.assertTrue(
     184              self.loop.remove_signal_handler(signal.SIGHUP))
     185          self.assertTrue(m_signal.set_wakeup_fd.called)
     186          self.assertTrue(m_signal.signal.called)
     187          self.assertEqual(
     188              (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
     189  
     190      @mock.patch('asyncio.unix_events.signal')
     191      def test_remove_signal_handler_2(self, m_signal):
     192          m_signal.NSIG = signal.NSIG
     193          m_signal.SIGINT = signal.SIGINT
     194          m_signal.valid_signals = signal.valid_signals
     195  
     196          self.loop.add_signal_handler(signal.SIGINT, lambda: True)
     197          self.loop._signal_handlers[signal.SIGHUP] = object()
     198          m_signal.set_wakeup_fd.reset_mock()
     199  
     200          self.assertTrue(
     201              self.loop.remove_signal_handler(signal.SIGINT))
     202          self.assertFalse(m_signal.set_wakeup_fd.called)
     203          self.assertTrue(m_signal.signal.called)
     204          self.assertEqual(
     205              (signal.SIGINT, m_signal.default_int_handler),
     206              m_signal.signal.call_args[0])
     207  
     208      @mock.patch('asyncio.unix_events.signal')
     209      @mock.patch('asyncio.base_events.logger')
     210      def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
     211          m_signal.NSIG = signal.NSIG
     212          m_signal.valid_signals = signal.valid_signals
     213          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     214  
     215          m_signal.set_wakeup_fd.side_effect = ValueError
     216  
     217          self.loop.remove_signal_handler(signal.SIGHUP)
     218          self.assertTrue(m_logging.info)
     219  
     220      @mock.patch('asyncio.unix_events.signal')
     221      def test_remove_signal_handler_error(self, m_signal):
     222          m_signal.NSIG = signal.NSIG
     223          m_signal.valid_signals = signal.valid_signals
     224          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     225  
     226          m_signal.signal.side_effect = OSError
     227  
     228          self.assertRaises(
     229              OSError, self.loop.remove_signal_handler, signal.SIGHUP)
     230  
     231      @mock.patch('asyncio.unix_events.signal')
     232      def test_remove_signal_handler_error2(self, m_signal):
     233          m_signal.NSIG = signal.NSIG
     234          m_signal.valid_signals = signal.valid_signals
     235          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     236  
     237          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     238              errno = errno.EINVAL
     239          m_signal.signal.side_effect = Err
     240  
     241          self.assertRaises(
     242              RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
     243  
     244      @mock.patch('asyncio.unix_events.signal')
     245      def test_close(self, m_signal):
     246          m_signal.NSIG = signal.NSIG
     247          m_signal.valid_signals = signal.valid_signals
     248  
     249          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     250          self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
     251  
     252          self.assertEqual(len(self.loop._signal_handlers), 2)
     253  
     254          m_signal.set_wakeup_fd.reset_mock()
     255  
     256          self.loop.close()
     257  
     258          self.assertEqual(len(self.loop._signal_handlers), 0)
     259          m_signal.set_wakeup_fd.assert_called_once_with(-1)
     260  
     261      @mock.patch('asyncio.unix_events.sys')
     262      @mock.patch('asyncio.unix_events.signal')
     263      def test_close_on_finalizing(self, m_signal, m_sys):
     264          m_signal.NSIG = signal.NSIG
     265          m_signal.valid_signals = signal.valid_signals
     266          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     267  
     268          self.assertEqual(len(self.loop._signal_handlers), 1)
     269          m_sys.is_finalizing.return_value = True
     270          m_signal.signal.reset_mock()
     271  
     272          with self.assertWarnsRegex(ResourceWarning,
     273                                     "skipping signal handlers removal"):
     274              self.loop.close()
     275  
     276          self.assertEqual(len(self.loop._signal_handlers), 0)
     277          self.assertFalse(m_signal.signal.called)
     278  
     279  
     280  @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
     281                       'UNIX Sockets are not supported')
     282  class ESC[4;38;5;81mSelectorEventLoopUnixSocketTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     283  
     284      def setUp(self):
     285          super().setUp()
     286          self.loop = asyncio.SelectorEventLoop()
     287          self.set_event_loop(self.loop)
     288  
     289      @socket_helper.skip_unless_bind_unix_socket
     290      def test_create_unix_server_existing_path_sock(self):
     291          with test_utils.unix_socket_path() as path:
     292              sock = socket.socket(socket.AF_UNIX)
     293              sock.bind(path)
     294              sock.listen(1)
     295              sock.close()
     296  
     297              coro = self.loop.create_unix_server(lambda: None, path)
     298              srv = self.loop.run_until_complete(coro)
     299              srv.close()
     300              self.loop.run_until_complete(srv.wait_closed())
     301  
     302      @socket_helper.skip_unless_bind_unix_socket
     303      def test_create_unix_server_pathlib(self):
     304          with test_utils.unix_socket_path() as path:
     305              path = pathlib.Path(path)
     306              srv_coro = self.loop.create_unix_server(lambda: None, path)
     307              srv = self.loop.run_until_complete(srv_coro)
     308              srv.close()
     309              self.loop.run_until_complete(srv.wait_closed())
     310  
     311      def test_create_unix_connection_pathlib(self):
     312          with test_utils.unix_socket_path() as path:
     313              path = pathlib.Path(path)
     314              coro = self.loop.create_unix_connection(lambda: None, path)
     315              with self.assertRaises(FileNotFoundError):
     316                  # If pathlib.Path wasn't supported, the exception would be
     317                  # different.
     318                  self.loop.run_until_complete(coro)
     319  
     320      def test_create_unix_server_existing_path_nonsock(self):
     321          path = test_utils.gen_unix_socket_path()
     322          self.addCleanup(os_helper.unlink, path)
     323          # create the file
     324          open(path, "wb").close()
     325  
     326          coro = self.loop.create_unix_server(lambda: None, path)
     327          with self.assertRaisesRegex(OSError,
     328                                      'Address.*is already in use'):
     329              self.loop.run_until_complete(coro)
     330  
     331      def test_create_unix_server_ssl_bool(self):
     332          coro = self.loop.create_unix_server(lambda: None, path='spam',
     333                                              ssl=True)
     334          with self.assertRaisesRegex(TypeError,
     335                                      'ssl argument must be an SSLContext'):
     336              self.loop.run_until_complete(coro)
     337  
     338      def test_create_unix_server_nopath_nosock(self):
     339          coro = self.loop.create_unix_server(lambda: None, path=None)
     340          with self.assertRaisesRegex(ValueError,
     341                                      'path was not specified, and no sock'):
     342              self.loop.run_until_complete(coro)
     343  
     344      def test_create_unix_server_path_inetsock(self):
     345          sock = socket.socket()
     346          with sock:
     347              coro = self.loop.create_unix_server(lambda: None, path=None,
     348                                                  sock=sock)
     349              with self.assertRaisesRegex(ValueError,
     350                                          'A UNIX Domain Stream.*was expected'):
     351                  self.loop.run_until_complete(coro)
     352  
     353      def test_create_unix_server_path_dgram(self):
     354          sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
     355          with sock:
     356              coro = self.loop.create_unix_server(lambda: None, path=None,
     357                                                  sock=sock)
     358              with self.assertRaisesRegex(ValueError,
     359                                          'A UNIX Domain Stream.*was expected'):
     360                  self.loop.run_until_complete(coro)
     361  
     362      @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
     363                           'no socket.SOCK_NONBLOCK (linux only)')
     364      @socket_helper.skip_unless_bind_unix_socket
     365      def test_create_unix_server_path_stream_bittype(self):
     366          fn = test_utils.gen_unix_socket_path()
     367          self.addCleanup(os_helper.unlink, fn)
     368  
     369          sock = socket.socket(socket.AF_UNIX,
     370                               socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
     371          with sock:
     372              sock.bind(fn)
     373              coro = self.loop.create_unix_server(lambda: None, path=None,
     374                                                  sock=sock)
     375              srv = self.loop.run_until_complete(coro)
     376              srv.close()
     377              self.loop.run_until_complete(srv.wait_closed())
     378  
     379      def test_create_unix_server_ssl_timeout_with_plain_sock(self):
     380          coro = self.loop.create_unix_server(lambda: None, path='spam',
     381                                              ssl_handshake_timeout=1)
     382          with self.assertRaisesRegex(
     383                  ValueError,
     384                  'ssl_handshake_timeout is only meaningful with ssl'):
     385              self.loop.run_until_complete(coro)
     386  
     387      def test_create_unix_connection_path_inetsock(self):
     388          sock = socket.socket()
     389          with sock:
     390              coro = self.loop.create_unix_connection(lambda: None,
     391                                                      sock=sock)
     392              with self.assertRaisesRegex(ValueError,
     393                                          'A UNIX Domain Stream.*was expected'):
     394                  self.loop.run_until_complete(coro)
     395  
     396      @mock.patch('asyncio.unix_events.socket')
     397      def test_create_unix_server_bind_error(self, m_socket):
     398          # Ensure that the socket is closed on any bind error
     399          sock = mock.Mock()
     400          m_socket.socket.return_value = sock
     401  
     402          sock.bind.side_effect = OSError
     403          coro = self.loop.create_unix_server(lambda: None, path="/test")
     404          with self.assertRaises(OSError):
     405              self.loop.run_until_complete(coro)
     406          self.assertTrue(sock.close.called)
     407  
     408          sock.bind.side_effect = MemoryError
     409          coro = self.loop.create_unix_server(lambda: None, path="/test")
     410          with self.assertRaises(MemoryError):
     411              self.loop.run_until_complete(coro)
     412          self.assertTrue(sock.close.called)
     413  
     414      def test_create_unix_connection_path_sock(self):
     415          coro = self.loop.create_unix_connection(
     416              lambda: None, os.devnull, sock=object())
     417          with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
     418              self.loop.run_until_complete(coro)
     419  
     420      def test_create_unix_connection_nopath_nosock(self):
     421          coro = self.loop.create_unix_connection(
     422              lambda: None, None)
     423          with self.assertRaisesRegex(ValueError,
     424                                      'no path and sock were specified'):
     425              self.loop.run_until_complete(coro)
     426  
     427      def test_create_unix_connection_nossl_serverhost(self):
     428          coro = self.loop.create_unix_connection(
     429              lambda: None, os.devnull, server_hostname='spam')
     430          with self.assertRaisesRegex(ValueError,
     431                                      'server_hostname is only meaningful'):
     432              self.loop.run_until_complete(coro)
     433  
     434      def test_create_unix_connection_ssl_noserverhost(self):
     435          coro = self.loop.create_unix_connection(
     436              lambda: None, os.devnull, ssl=True)
     437  
     438          with self.assertRaisesRegex(
     439              ValueError, 'you have to pass server_hostname when using ssl'):
     440  
     441              self.loop.run_until_complete(coro)
     442  
     443      def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
     444          coro = self.loop.create_unix_connection(lambda: None, path='spam',
     445                                              ssl_handshake_timeout=1)
     446          with self.assertRaisesRegex(
     447                  ValueError,
     448                  'ssl_handshake_timeout is only meaningful with ssl'):
     449              self.loop.run_until_complete(coro)
     450  
     451  
     452  @unittest.skipUnless(hasattr(os, 'sendfile'),
     453                       'sendfile is not supported')
     454  class ESC[4;38;5;81mSelectorEventLoopUnixSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     455      DATA = b"12345abcde" * 16 * 1024  # 160 KiB
     456  
     457      class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     458  
     459          def __init__(self, loop):
     460              self.started = False
     461              self.closed = False
     462              self.data = bytearray()
     463              self.fut = loop.create_future()
     464              self.transport = None
     465              self._ready = loop.create_future()
     466  
     467          def connection_made(self, transport):
     468              self.started = True
     469              self.transport = transport
     470              self._ready.set_result(None)
     471  
     472          def data_received(self, data):
     473              self.data.extend(data)
     474  
     475          def connection_lost(self, exc):
     476              self.closed = True
     477              self.fut.set_result(None)
     478  
     479          async def wait_closed(self):
     480              await self.fut
     481  
     482      @classmethod
     483      def setUpClass(cls):
     484          with open(os_helper.TESTFN, 'wb') as fp:
     485              fp.write(cls.DATA)
     486          super().setUpClass()
     487  
     488      @classmethod
     489      def tearDownClass(cls):
     490          os_helper.unlink(os_helper.TESTFN)
     491          super().tearDownClass()
     492  
     493      def setUp(self):
     494          self.loop = asyncio.new_event_loop()
     495          self.set_event_loop(self.loop)
     496          self.file = open(os_helper.TESTFN, 'rb')
     497          self.addCleanup(self.file.close)
     498          super().setUp()
     499  
     500      def make_socket(self, cleanup=True):
     501          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     502          sock.setblocking(False)
     503          sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
     504          sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
     505          if cleanup:
     506              self.addCleanup(sock.close)
     507          return sock
     508  
     509      def run_loop(self, coro):
     510          return self.loop.run_until_complete(coro)
     511  
     512      def prepare(self):
     513          sock = self.make_socket()
     514          proto = self.MyProto(self.loop)
     515          port = socket_helper.find_unused_port()
     516          srv_sock = self.make_socket(cleanup=False)
     517          srv_sock.bind((socket_helper.HOST, port))
     518          server = self.run_loop(self.loop.create_server(
     519              lambda: proto, sock=srv_sock))
     520          self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
     521          self.run_loop(proto._ready)
     522  
     523          def cleanup():
     524              proto.transport.close()
     525              self.run_loop(proto.wait_closed())
     526  
     527              server.close()
     528              self.run_loop(server.wait_closed())
     529  
     530          self.addCleanup(cleanup)
     531  
     532          return sock, proto
     533  
     534      def test_sock_sendfile_not_available(self):
     535          sock, proto = self.prepare()
     536          with mock.patch('asyncio.unix_events.os', spec=[]):
     537              with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     538                                          "os[.]sendfile[(][)] is not available"):
     539                  self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
     540                                                                0, None))
     541          self.assertEqual(self.file.tell(), 0)
     542  
     543      def test_sock_sendfile_not_a_file(self):
     544          sock, proto = self.prepare()
     545          f = object()
     546          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     547                                      "not a regular file"):
     548              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     549                                                            0, None))
     550          self.assertEqual(self.file.tell(), 0)
     551  
     552      def test_sock_sendfile_iobuffer(self):
     553          sock, proto = self.prepare()
     554          f = io.BytesIO()
     555          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     556                                      "not a regular file"):
     557              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     558                                                            0, None))
     559          self.assertEqual(self.file.tell(), 0)
     560  
     561      def test_sock_sendfile_not_regular_file(self):
     562          sock, proto = self.prepare()
     563          f = mock.Mock()
     564          f.fileno.return_value = -1
     565          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     566                                      "not a regular file"):
     567              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     568                                                            0, None))
     569          self.assertEqual(self.file.tell(), 0)
     570  
     571      def test_sock_sendfile_cancel1(self):
     572          sock, proto = self.prepare()
     573  
     574          fut = self.loop.create_future()
     575          fileno = self.file.fileno()
     576          self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     577                                               0, None, len(self.DATA), 0)
     578          fut.cancel()
     579          with contextlib.suppress(asyncio.CancelledError):
     580              self.run_loop(fut)
     581          with self.assertRaises(KeyError):
     582              self.loop._selector.get_key(sock)
     583  
     584      def test_sock_sendfile_cancel2(self):
     585          sock, proto = self.prepare()
     586  
     587          fut = self.loop.create_future()
     588          fileno = self.file.fileno()
     589          self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     590                                               0, None, len(self.DATA), 0)
     591          fut.cancel()
     592          self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
     593                                               0, None, len(self.DATA), 0)
     594          with self.assertRaises(KeyError):
     595              self.loop._selector.get_key(sock)
     596  
     597      def test_sock_sendfile_blocking_error(self):
     598          sock, proto = self.prepare()
     599  
     600          fileno = self.file.fileno()
     601          fut = mock.Mock()
     602          fut.cancelled.return_value = False
     603          with mock.patch('os.sendfile', side_effect=BlockingIOError()):
     604              self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     605                                                   0, None, len(self.DATA), 0)
     606          key = self.loop._selector.get_key(sock)
     607          self.assertIsNotNone(key)
     608          fut.add_done_callback.assert_called_once_with(mock.ANY)
     609  
     610      def test_sock_sendfile_os_error_first_call(self):
     611          sock, proto = self.prepare()
     612  
     613          fileno = self.file.fileno()
     614          fut = self.loop.create_future()
     615          with mock.patch('os.sendfile', side_effect=OSError()):
     616              self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     617                                                   0, None, len(self.DATA), 0)
     618          with self.assertRaises(KeyError):
     619              self.loop._selector.get_key(sock)
     620          exc = fut.exception()
     621          self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
     622          self.assertEqual(0, self.file.tell())
     623  
     624      def test_sock_sendfile_os_error_next_call(self):
     625          sock, proto = self.prepare()
     626  
     627          fileno = self.file.fileno()
     628          fut = self.loop.create_future()
     629          err = OSError()
     630          with mock.patch('os.sendfile', side_effect=err):
     631              self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
     632                                                   sock, fileno,
     633                                                   1000, None, len(self.DATA),
     634                                                   1000)
     635          with self.assertRaises(KeyError):
     636              self.loop._selector.get_key(sock)
     637          exc = fut.exception()
     638          self.assertIs(exc, err)
     639          self.assertEqual(1000, self.file.tell())
     640  
     641      def test_sock_sendfile_exception(self):
     642          sock, proto = self.prepare()
     643  
     644          fileno = self.file.fileno()
     645          fut = self.loop.create_future()
     646          err = asyncio.SendfileNotAvailableError()
     647          with mock.patch('os.sendfile', side_effect=err):
     648              self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
     649                                                   sock, fileno,
     650                                                   1000, None, len(self.DATA),
     651                                                   1000)
     652          with self.assertRaises(KeyError):
     653              self.loop._selector.get_key(sock)
     654          exc = fut.exception()
     655          self.assertIs(exc, err)
     656          self.assertEqual(1000, self.file.tell())
     657  
     658  
     659  class ESC[4;38;5;81mUnixReadPipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     660  
     661      def setUp(self):
     662          super().setUp()
     663          self.loop = self.new_test_loop()
     664          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
     665          self.pipe = mock.Mock(spec_set=io.RawIOBase)
     666          self.pipe.fileno.return_value = 5
     667  
     668          blocking_patcher = mock.patch('os.set_blocking')
     669          blocking_patcher.start()
     670          self.addCleanup(blocking_patcher.stop)
     671  
     672          fstat_patcher = mock.patch('os.fstat')
     673          m_fstat = fstat_patcher.start()
     674          st = mock.Mock()
     675          st.st_mode = stat.S_IFIFO
     676          m_fstat.return_value = st
     677          self.addCleanup(fstat_patcher.stop)
     678  
     679      def read_pipe_transport(self, waiter=None):
     680          transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
     681                                                         self.protocol,
     682                                                         waiter=waiter)
     683          self.addCleanup(close_pipe_transport, transport)
     684          return transport
     685  
     686      def test_ctor(self):
     687          waiter = self.loop.create_future()
     688          tr = self.read_pipe_transport(waiter=waiter)
     689          self.loop.run_until_complete(waiter)
     690  
     691          self.protocol.connection_made.assert_called_with(tr)
     692          self.loop.assert_reader(5, tr._read_ready)
     693          self.assertIsNone(waiter.result())
     694  
     695      @mock.patch('os.read')
     696      def test__read_ready(self, m_read):
     697          tr = self.read_pipe_transport()
     698          m_read.return_value = b'data'
     699          tr._read_ready()
     700  
     701          m_read.assert_called_with(5, tr.max_size)
     702          self.protocol.data_received.assert_called_with(b'data')
     703  
     704      @mock.patch('os.read')
     705      def test__read_ready_eof(self, m_read):
     706          tr = self.read_pipe_transport()
     707          m_read.return_value = b''
     708          tr._read_ready()
     709  
     710          m_read.assert_called_with(5, tr.max_size)
     711          self.assertFalse(self.loop.readers)
     712          test_utils.run_briefly(self.loop)
     713          self.protocol.eof_received.assert_called_with()
     714          self.protocol.connection_lost.assert_called_with(None)
     715  
     716      @mock.patch('os.read')
     717      def test__read_ready_blocked(self, m_read):
     718          tr = self.read_pipe_transport()
     719          m_read.side_effect = BlockingIOError
     720          tr._read_ready()
     721  
     722          m_read.assert_called_with(5, tr.max_size)
     723          test_utils.run_briefly(self.loop)
     724          self.assertFalse(self.protocol.data_received.called)
     725  
     726      @mock.patch('asyncio.log.logger.error')
     727      @mock.patch('os.read')
     728      def test__read_ready_error(self, m_read, m_logexc):
     729          tr = self.read_pipe_transport()
     730          err = OSError()
     731          m_read.side_effect = err
     732          tr._close = mock.Mock()
     733          tr._read_ready()
     734  
     735          m_read.assert_called_with(5, tr.max_size)
     736          tr._close.assert_called_with(err)
     737          m_logexc.assert_called_with(
     738              test_utils.MockPattern(
     739                  'Fatal read error on pipe transport'
     740                  '\nprotocol:.*\ntransport:.*'),
     741              exc_info=(OSError, MOCK_ANY, MOCK_ANY))
     742  
     743      @mock.patch('os.read')
     744      def test_pause_reading(self, m_read):
     745          tr = self.read_pipe_transport()
     746          m = mock.Mock()
     747          self.loop.add_reader(5, m)
     748          tr.pause_reading()
     749          self.assertFalse(self.loop.readers)
     750  
     751      @mock.patch('os.read')
     752      def test_resume_reading(self, m_read):
     753          tr = self.read_pipe_transport()
     754          tr.pause_reading()
     755          tr.resume_reading()
     756          self.loop.assert_reader(5, tr._read_ready)
     757  
     758      @mock.patch('os.read')
     759      def test_close(self, m_read):
     760          tr = self.read_pipe_transport()
     761          tr._close = mock.Mock()
     762          tr.close()
     763          tr._close.assert_called_with(None)
     764  
     765      @mock.patch('os.read')
     766      def test_close_already_closing(self, m_read):
     767          tr = self.read_pipe_transport()
     768          tr._closing = True
     769          tr._close = mock.Mock()
     770          tr.close()
     771          self.assertFalse(tr._close.called)
     772  
     773      @mock.patch('os.read')
     774      def test__close(self, m_read):
     775          tr = self.read_pipe_transport()
     776          err = object()
     777          tr._close(err)
     778          self.assertTrue(tr.is_closing())
     779          self.assertFalse(self.loop.readers)
     780          test_utils.run_briefly(self.loop)
     781          self.protocol.connection_lost.assert_called_with(err)
     782  
     783      def test__call_connection_lost(self):
     784          tr = self.read_pipe_transport()
     785          self.assertIsNotNone(tr._protocol)
     786          self.assertIsNotNone(tr._loop)
     787  
     788          err = None
     789          tr._call_connection_lost(err)
     790          self.protocol.connection_lost.assert_called_with(err)
     791          self.pipe.close.assert_called_with()
     792  
     793          self.assertIsNone(tr._protocol)
     794          self.assertIsNone(tr._loop)
     795  
     796      def test__call_connection_lost_with_err(self):
     797          tr = self.read_pipe_transport()
     798          self.assertIsNotNone(tr._protocol)
     799          self.assertIsNotNone(tr._loop)
     800  
     801          err = OSError()
     802          tr._call_connection_lost(err)
     803          self.protocol.connection_lost.assert_called_with(err)
     804          self.pipe.close.assert_called_with()
     805  
     806          self.assertIsNone(tr._protocol)
     807          self.assertIsNone(tr._loop)
     808  
     809      def test_pause_reading_on_closed_pipe(self):
     810          tr = self.read_pipe_transport()
     811          tr.close()
     812          test_utils.run_briefly(self.loop)
     813          self.assertIsNone(tr._loop)
     814          tr.pause_reading()
     815  
     816      def test_pause_reading_on_paused_pipe(self):
     817          tr = self.read_pipe_transport()
     818          tr.pause_reading()
     819          # the second call should do nothing
     820          tr.pause_reading()
     821  
     822      def test_resume_reading_on_closed_pipe(self):
     823          tr = self.read_pipe_transport()
     824          tr.close()
     825          test_utils.run_briefly(self.loop)
     826          self.assertIsNone(tr._loop)
     827          tr.resume_reading()
     828  
     829      def test_resume_reading_on_paused_pipe(self):
     830          tr = self.read_pipe_transport()
     831          # the pipe is not paused
     832          # resuming should do nothing
     833          tr.resume_reading()
     834  
     835  
     836  class ESC[4;38;5;81mUnixWritePipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     837  
     838      def setUp(self):
     839          super().setUp()
     840          self.loop = self.new_test_loop()
     841          self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
     842          self.pipe = mock.Mock(spec_set=io.RawIOBase)
     843          self.pipe.fileno.return_value = 5
     844  
     845          blocking_patcher = mock.patch('os.set_blocking')
     846          blocking_patcher.start()
     847          self.addCleanup(blocking_patcher.stop)
     848  
     849          fstat_patcher = mock.patch('os.fstat')
     850          m_fstat = fstat_patcher.start()
     851          st = mock.Mock()
     852          st.st_mode = stat.S_IFSOCK
     853          m_fstat.return_value = st
     854          self.addCleanup(fstat_patcher.stop)
     855  
     856      def write_pipe_transport(self, waiter=None):
     857          transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
     858                                                          self.protocol,
     859                                                          waiter=waiter)
     860          self.addCleanup(close_pipe_transport, transport)
     861          return transport
     862  
     863      def test_ctor(self):
     864          waiter = self.loop.create_future()
     865          tr = self.write_pipe_transport(waiter=waiter)
     866          self.loop.run_until_complete(waiter)
     867  
     868          self.protocol.connection_made.assert_called_with(tr)
     869          self.loop.assert_reader(5, tr._read_ready)
     870          self.assertEqual(None, waiter.result())
     871  
     872      def test_can_write_eof(self):
     873          tr = self.write_pipe_transport()
     874          self.assertTrue(tr.can_write_eof())
     875  
     876      @mock.patch('os.write')
     877      def test_write(self, m_write):
     878          tr = self.write_pipe_transport()
     879          m_write.return_value = 4
     880          tr.write(b'data')
     881          m_write.assert_called_with(5, b'data')
     882          self.assertFalse(self.loop.writers)
     883          self.assertEqual(bytearray(), tr._buffer)
     884  
     885      @mock.patch('os.write')
     886      def test_write_no_data(self, m_write):
     887          tr = self.write_pipe_transport()
     888          tr.write(b'')
     889          self.assertFalse(m_write.called)
     890          self.assertFalse(self.loop.writers)
     891          self.assertEqual(bytearray(b''), tr._buffer)
     892  
     893      @mock.patch('os.write')
     894      def test_write_partial(self, m_write):
     895          tr = self.write_pipe_transport()
     896          m_write.return_value = 2
     897          tr.write(b'data')
     898          self.loop.assert_writer(5, tr._write_ready)
     899          self.assertEqual(bytearray(b'ta'), tr._buffer)
     900  
     901      @mock.patch('os.write')
     902      def test_write_buffer(self, m_write):
     903          tr = self.write_pipe_transport()
     904          self.loop.add_writer(5, tr._write_ready)
     905          tr._buffer = bytearray(b'previous')
     906          tr.write(b'data')
     907          self.assertFalse(m_write.called)
     908          self.loop.assert_writer(5, tr._write_ready)
     909          self.assertEqual(bytearray(b'previousdata'), tr._buffer)
     910  
     911      @mock.patch('os.write')
     912      def test_write_again(self, m_write):
     913          tr = self.write_pipe_transport()
     914          m_write.side_effect = BlockingIOError()
     915          tr.write(b'data')
     916          m_write.assert_called_with(5, bytearray(b'data'))
     917          self.loop.assert_writer(5, tr._write_ready)
     918          self.assertEqual(bytearray(b'data'), tr._buffer)
     919  
     920      @mock.patch('asyncio.unix_events.logger')
     921      @mock.patch('os.write')
     922      def test_write_err(self, m_write, m_log):
     923          tr = self.write_pipe_transport()
     924          err = OSError()
     925          m_write.side_effect = err
     926          tr._fatal_error = mock.Mock()
     927          tr.write(b'data')
     928          m_write.assert_called_with(5, b'data')
     929          self.assertFalse(self.loop.writers)
     930          self.assertEqual(bytearray(), tr._buffer)
     931          tr._fatal_error.assert_called_with(
     932                              err,
     933                              'Fatal write error on pipe transport')
     934          self.assertEqual(1, tr._conn_lost)
     935  
     936          tr.write(b'data')
     937          self.assertEqual(2, tr._conn_lost)
     938          tr.write(b'data')
     939          tr.write(b'data')
     940          tr.write(b'data')
     941          tr.write(b'data')
     942          # This is a bit overspecified. :-(
     943          m_log.warning.assert_called_with(
     944              'pipe closed by peer or os.write(pipe, data) raised exception.')
     945          tr.close()
     946  
     947      @mock.patch('os.write')
     948      def test_write_close(self, m_write):
     949          tr = self.write_pipe_transport()
     950          tr._read_ready()  # pipe was closed by peer
     951  
     952          tr.write(b'data')
     953          self.assertEqual(tr._conn_lost, 1)
     954          tr.write(b'data')
     955          self.assertEqual(tr._conn_lost, 2)
     956  
     957      def test__read_ready(self):
     958          tr = self.write_pipe_transport()
     959          tr._read_ready()
     960          self.assertFalse(self.loop.readers)
     961          self.assertFalse(self.loop.writers)
     962          self.assertTrue(tr.is_closing())
     963          test_utils.run_briefly(self.loop)
     964          self.protocol.connection_lost.assert_called_with(None)
     965  
     966      @mock.patch('os.write')
     967      def test__write_ready(self, m_write):
     968          tr = self.write_pipe_transport()
     969          self.loop.add_writer(5, tr._write_ready)
     970          tr._buffer = bytearray(b'data')
     971          m_write.return_value = 4
     972          tr._write_ready()
     973          self.assertFalse(self.loop.writers)
     974          self.assertEqual(bytearray(), tr._buffer)
     975  
     976      @mock.patch('os.write')
     977      def test__write_ready_partial(self, m_write):
     978          tr = self.write_pipe_transport()
     979          self.loop.add_writer(5, tr._write_ready)
     980          tr._buffer = bytearray(b'data')
     981          m_write.return_value = 3
     982          tr._write_ready()
     983          self.loop.assert_writer(5, tr._write_ready)
     984          self.assertEqual(bytearray(b'a'), tr._buffer)
     985  
     986      @mock.patch('os.write')
     987      def test__write_ready_again(self, m_write):
     988          tr = self.write_pipe_transport()
     989          self.loop.add_writer(5, tr._write_ready)
     990          tr._buffer = bytearray(b'data')
     991          m_write.side_effect = BlockingIOError()
     992          tr._write_ready()
     993          m_write.assert_called_with(5, bytearray(b'data'))
     994          self.loop.assert_writer(5, tr._write_ready)
     995          self.assertEqual(bytearray(b'data'), tr._buffer)
     996  
     997      @mock.patch('os.write')
     998      def test__write_ready_empty(self, m_write):
     999          tr = self.write_pipe_transport()
    1000          self.loop.add_writer(5, tr._write_ready)
    1001          tr._buffer = bytearray(b'data')
    1002          m_write.return_value = 0
    1003          tr._write_ready()
    1004          m_write.assert_called_with(5, bytearray(b'data'))
    1005          self.loop.assert_writer(5, tr._write_ready)
    1006          self.assertEqual(bytearray(b'data'), tr._buffer)
    1007  
    1008      @mock.patch('asyncio.log.logger.error')
    1009      @mock.patch('os.write')
    1010      def test__write_ready_err(self, m_write, m_logexc):
    1011          tr = self.write_pipe_transport()
    1012          self.loop.add_writer(5, tr._write_ready)
    1013          tr._buffer = bytearray(b'data')
    1014          m_write.side_effect = err = OSError()
    1015          tr._write_ready()
    1016          self.assertFalse(self.loop.writers)
    1017          self.assertFalse(self.loop.readers)
    1018          self.assertEqual(bytearray(), tr._buffer)
    1019          self.assertTrue(tr.is_closing())
    1020          m_logexc.assert_not_called()
    1021          self.assertEqual(1, tr._conn_lost)
    1022          test_utils.run_briefly(self.loop)
    1023          self.protocol.connection_lost.assert_called_with(err)
    1024  
    1025      @mock.patch('os.write')
    1026      def test__write_ready_closing(self, m_write):
    1027          tr = self.write_pipe_transport()
    1028          self.loop.add_writer(5, tr._write_ready)
    1029          tr._closing = True
    1030          tr._buffer = bytearray(b'data')
    1031          m_write.return_value = 4
    1032          tr._write_ready()
    1033          self.assertFalse(self.loop.writers)
    1034          self.assertFalse(self.loop.readers)
    1035          self.assertEqual(bytearray(), tr._buffer)
    1036          self.protocol.connection_lost.assert_called_with(None)
    1037          self.pipe.close.assert_called_with()
    1038  
    1039      @mock.patch('os.write')
    1040      def test_abort(self, m_write):
    1041          tr = self.write_pipe_transport()
    1042          self.loop.add_writer(5, tr._write_ready)
    1043          self.loop.add_reader(5, tr._read_ready)
    1044          tr._buffer = [b'da', b'ta']
    1045          tr.abort()
    1046          self.assertFalse(m_write.called)
    1047          self.assertFalse(self.loop.readers)
    1048          self.assertFalse(self.loop.writers)
    1049          self.assertEqual([], tr._buffer)
    1050          self.assertTrue(tr.is_closing())
    1051          test_utils.run_briefly(self.loop)
    1052          self.protocol.connection_lost.assert_called_with(None)
    1053  
    1054      def test__call_connection_lost(self):
    1055          tr = self.write_pipe_transport()
    1056          self.assertIsNotNone(tr._protocol)
    1057          self.assertIsNotNone(tr._loop)
    1058  
    1059          err = None
    1060          tr._call_connection_lost(err)
    1061          self.protocol.connection_lost.assert_called_with(err)
    1062          self.pipe.close.assert_called_with()
    1063  
    1064          self.assertIsNone(tr._protocol)
    1065          self.assertIsNone(tr._loop)
    1066  
    1067      def test__call_connection_lost_with_err(self):
    1068          tr = self.write_pipe_transport()
    1069          self.assertIsNotNone(tr._protocol)
    1070          self.assertIsNotNone(tr._loop)
    1071  
    1072          err = OSError()
    1073          tr._call_connection_lost(err)
    1074          self.protocol.connection_lost.assert_called_with(err)
    1075          self.pipe.close.assert_called_with()
    1076  
    1077          self.assertIsNone(tr._protocol)
    1078          self.assertIsNone(tr._loop)
    1079  
    1080      def test_close(self):
    1081          tr = self.write_pipe_transport()
    1082          tr.write_eof = mock.Mock()
    1083          tr.close()
    1084          tr.write_eof.assert_called_with()
    1085  
    1086          # closing the transport twice must not fail
    1087          tr.close()
    1088  
    1089      def test_close_closing(self):
    1090          tr = self.write_pipe_transport()
    1091          tr.write_eof = mock.Mock()
    1092          tr._closing = True
    1093          tr.close()
    1094          self.assertFalse(tr.write_eof.called)
    1095  
    1096      def test_write_eof(self):
    1097          tr = self.write_pipe_transport()
    1098          tr.write_eof()
    1099          self.assertTrue(tr.is_closing())
    1100          self.assertFalse(self.loop.readers)
    1101          test_utils.run_briefly(self.loop)
    1102          self.protocol.connection_lost.assert_called_with(None)
    1103  
    1104      def test_write_eof_pending(self):
    1105          tr = self.write_pipe_transport()
    1106          tr._buffer = [b'data']
    1107          tr.write_eof()
    1108          self.assertTrue(tr.is_closing())
    1109          self.assertFalse(self.protocol.connection_lost.called)
    1110  
    1111  
    1112  class ESC[4;38;5;81mAbstractChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1113  
    1114      def test_warns_on_subclassing(self):
    1115          with self.assertWarns(DeprecationWarning):
    1116              class ESC[4;38;5;81mMyWatcher(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractChildWatcher):
    1117                  pass
    1118  
    1119      def test_not_implemented(self):
    1120          f = mock.Mock()
    1121          watcher = asyncio.AbstractChildWatcher()
    1122          self.assertRaises(
    1123              NotImplementedError, watcher.add_child_handler, f, f)
    1124          self.assertRaises(
    1125              NotImplementedError, watcher.remove_child_handler, f)
    1126          self.assertRaises(
    1127              NotImplementedError, watcher.attach_loop, f)
    1128          self.assertRaises(
    1129              NotImplementedError, watcher.close)
    1130          self.assertRaises(
    1131              NotImplementedError, watcher.is_active)
    1132          self.assertRaises(
    1133              NotImplementedError, watcher.__enter__)
    1134          self.assertRaises(
    1135              NotImplementedError, watcher.__exit__, f, f, f)
    1136  
    1137  
    1138  class ESC[4;38;5;81mBaseChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1139  
    1140      def test_not_implemented(self):
    1141          f = mock.Mock()
    1142          watcher = unix_events.BaseChildWatcher()
    1143          self.assertRaises(
    1144              NotImplementedError, watcher._do_waitpid, f)
    1145  
    1146  
    1147  class ESC[4;38;5;81mChildWatcherTestsMixin:
    1148  
    1149      ignore_warnings = mock.patch.object(log.logger, "warning")
    1150  
    1151      def setUp(self):
    1152          super().setUp()
    1153          self.loop = self.new_test_loop()
    1154          self.running = False
    1155          self.zombies = {}
    1156  
    1157          with mock.patch.object(
    1158                  self.loop, "add_signal_handler") as self.m_add_signal_handler:
    1159              self.watcher = self.create_watcher()
    1160              self.watcher.attach_loop(self.loop)
    1161  
    1162      def waitpid(self, pid, flags):
    1163          if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
    1164              self.assertGreater(pid, 0)
    1165          try:
    1166              if pid < 0:
    1167                  return self.zombies.popitem()
    1168              else:
    1169                  return pid, self.zombies.pop(pid)
    1170          except KeyError:
    1171              pass
    1172          if self.running:
    1173              return 0, 0
    1174          else:
    1175              raise ChildProcessError()
    1176  
    1177      def add_zombie(self, pid, status):
    1178          self.zombies[pid] = status
    1179  
    1180      def waitstatus_to_exitcode(self, status):
    1181          if status > 32768:
    1182              return status - 32768
    1183          elif 32700 < status < 32768:
    1184              return status - 32768
    1185          else:
    1186              return status
    1187  
    1188      def test_create_watcher(self):
    1189          self.m_add_signal_handler.assert_called_once_with(
    1190              signal.SIGCHLD, self.watcher._sig_chld)
    1191  
    1192      def waitpid_mocks(func):
    1193          def wrapped_func(self):
    1194              def patch(target, wrapper):
    1195                  return mock.patch(target, wraps=wrapper,
    1196                                    new_callable=mock.Mock)
    1197  
    1198              with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
    1199                   patch('os.waitpid', self.waitpid) as m_waitpid:
    1200                  func(self, m_waitpid)
    1201          return wrapped_func
    1202  
    1203      @waitpid_mocks
    1204      def test_sigchld(self, m_waitpid):
    1205          # register a child
    1206          callback = mock.Mock()
    1207  
    1208          with self.watcher:
    1209              self.running = True
    1210              self.watcher.add_child_handler(42, callback, 9, 10, 14)
    1211  
    1212          self.assertFalse(callback.called)
    1213  
    1214          # child is running
    1215          self.watcher._sig_chld()
    1216  
    1217          self.assertFalse(callback.called)
    1218  
    1219          # child terminates (returncode 12)
    1220          self.running = False
    1221          self.add_zombie(42, EXITCODE(12))
    1222          self.watcher._sig_chld()
    1223  
    1224          callback.assert_called_once_with(42, 12, 9, 10, 14)
    1225  
    1226          callback.reset_mock()
    1227  
    1228          # ensure that the child is effectively reaped
    1229          self.add_zombie(42, EXITCODE(13))
    1230          with self.ignore_warnings:
    1231              self.watcher._sig_chld()
    1232  
    1233          self.assertFalse(callback.called)
    1234  
    1235          # sigchld called again
    1236          self.zombies.clear()
    1237          self.watcher._sig_chld()
    1238  
    1239          self.assertFalse(callback.called)
    1240  
    1241      @waitpid_mocks
    1242      def test_sigchld_two_children(self, m_waitpid):
    1243          callback1 = mock.Mock()
    1244          callback2 = mock.Mock()
    1245  
    1246          # register child 1
    1247          with self.watcher:
    1248              self.running = True
    1249              self.watcher.add_child_handler(43, callback1, 7, 8)
    1250  
    1251          self.assertFalse(callback1.called)
    1252          self.assertFalse(callback2.called)
    1253  
    1254          # register child 2
    1255          with self.watcher:
    1256              self.watcher.add_child_handler(44, callback2, 147, 18)
    1257  
    1258          self.assertFalse(callback1.called)
    1259          self.assertFalse(callback2.called)
    1260  
    1261          # children are running
    1262          self.watcher._sig_chld()
    1263  
    1264          self.assertFalse(callback1.called)
    1265          self.assertFalse(callback2.called)
    1266  
    1267          # child 1 terminates (signal 3)
    1268          self.add_zombie(43, SIGNAL(3))
    1269          self.watcher._sig_chld()
    1270  
    1271          callback1.assert_called_once_with(43, -3, 7, 8)
    1272          self.assertFalse(callback2.called)
    1273  
    1274          callback1.reset_mock()
    1275  
    1276          # child 2 still running
    1277          self.watcher._sig_chld()
    1278  
    1279          self.assertFalse(callback1.called)
    1280          self.assertFalse(callback2.called)
    1281  
    1282          # child 2 terminates (code 108)
    1283          self.add_zombie(44, EXITCODE(108))
    1284          self.running = False
    1285          self.watcher._sig_chld()
    1286  
    1287          callback2.assert_called_once_with(44, 108, 147, 18)
    1288          self.assertFalse(callback1.called)
    1289  
    1290          callback2.reset_mock()
    1291  
    1292          # ensure that the children are effectively reaped
    1293          self.add_zombie(43, EXITCODE(14))
    1294          self.add_zombie(44, EXITCODE(15))
    1295          with self.ignore_warnings:
    1296              self.watcher._sig_chld()
    1297  
    1298          self.assertFalse(callback1.called)
    1299          self.assertFalse(callback2.called)
    1300  
    1301          # sigchld called again
    1302          self.zombies.clear()
    1303          self.watcher._sig_chld()
    1304  
    1305          self.assertFalse(callback1.called)
    1306          self.assertFalse(callback2.called)
    1307  
    1308      @waitpid_mocks
    1309      def test_sigchld_two_children_terminating_together(self, m_waitpid):
    1310          callback1 = mock.Mock()
    1311          callback2 = mock.Mock()
    1312  
    1313          # register child 1
    1314          with self.watcher:
    1315              self.running = True
    1316              self.watcher.add_child_handler(45, callback1, 17, 8)
    1317  
    1318          self.assertFalse(callback1.called)
    1319          self.assertFalse(callback2.called)
    1320  
    1321          # register child 2
    1322          with self.watcher:
    1323              self.watcher.add_child_handler(46, callback2, 1147, 18)
    1324  
    1325          self.assertFalse(callback1.called)
    1326          self.assertFalse(callback2.called)
    1327  
    1328          # children are running
    1329          self.watcher._sig_chld()
    1330  
    1331          self.assertFalse(callback1.called)
    1332          self.assertFalse(callback2.called)
    1333  
    1334          # child 1 terminates (code 78)
    1335          # child 2 terminates (signal 5)
    1336          self.add_zombie(45, EXITCODE(78))
    1337          self.add_zombie(46, SIGNAL(5))
    1338          self.running = False
    1339          self.watcher._sig_chld()
    1340  
    1341          callback1.assert_called_once_with(45, 78, 17, 8)
    1342          callback2.assert_called_once_with(46, -5, 1147, 18)
    1343  
    1344          callback1.reset_mock()
    1345          callback2.reset_mock()
    1346  
    1347          # ensure that the children are effectively reaped
    1348          self.add_zombie(45, EXITCODE(14))
    1349          self.add_zombie(46, EXITCODE(15))
    1350          with self.ignore_warnings:
    1351              self.watcher._sig_chld()
    1352  
    1353          self.assertFalse(callback1.called)
    1354          self.assertFalse(callback2.called)
    1355  
    1356      @waitpid_mocks
    1357      def test_sigchld_race_condition(self, m_waitpid):
    1358          # register a child
    1359          callback = mock.Mock()
    1360  
    1361          with self.watcher:
    1362              # child terminates before being registered
    1363              self.add_zombie(50, EXITCODE(4))
    1364              self.watcher._sig_chld()
    1365  
    1366              self.watcher.add_child_handler(50, callback, 1, 12)
    1367  
    1368          callback.assert_called_once_with(50, 4, 1, 12)
    1369          callback.reset_mock()
    1370  
    1371          # ensure that the child is effectively reaped
    1372          self.add_zombie(50, SIGNAL(1))
    1373          with self.ignore_warnings:
    1374              self.watcher._sig_chld()
    1375  
    1376          self.assertFalse(callback.called)
    1377  
    1378      @waitpid_mocks
    1379      def test_sigchld_replace_handler(self, m_waitpid):
    1380          callback1 = mock.Mock()
    1381          callback2 = mock.Mock()
    1382  
    1383          # register a child
    1384          with self.watcher:
    1385              self.running = True
    1386              self.watcher.add_child_handler(51, callback1, 19)
    1387  
    1388          self.assertFalse(callback1.called)
    1389          self.assertFalse(callback2.called)
    1390  
    1391          # register the same child again
    1392          with self.watcher:
    1393              self.watcher.add_child_handler(51, callback2, 21)
    1394  
    1395          self.assertFalse(callback1.called)
    1396          self.assertFalse(callback2.called)
    1397  
    1398          # child terminates (signal 8)
    1399          self.running = False
    1400          self.add_zombie(51, SIGNAL(8))
    1401          self.watcher._sig_chld()
    1402  
    1403          callback2.assert_called_once_with(51, -8, 21)
    1404          self.assertFalse(callback1.called)
    1405  
    1406          callback2.reset_mock()
    1407  
    1408          # ensure that the child is effectively reaped
    1409          self.add_zombie(51, EXITCODE(13))
    1410          with self.ignore_warnings:
    1411              self.watcher._sig_chld()
    1412  
    1413          self.assertFalse(callback1.called)
    1414          self.assertFalse(callback2.called)
    1415  
    1416      @waitpid_mocks
    1417      def test_sigchld_remove_handler(self, m_waitpid):
    1418          callback = mock.Mock()
    1419  
    1420          # register a child
    1421          with self.watcher:
    1422              self.running = True
    1423              self.watcher.add_child_handler(52, callback, 1984)
    1424  
    1425          self.assertFalse(callback.called)
    1426  
    1427          # unregister the child
    1428          self.watcher.remove_child_handler(52)
    1429  
    1430          self.assertFalse(callback.called)
    1431  
    1432          # child terminates (code 99)
    1433          self.running = False
    1434          self.add_zombie(52, EXITCODE(99))
    1435          with self.ignore_warnings:
    1436              self.watcher._sig_chld()
    1437  
    1438          self.assertFalse(callback.called)
    1439  
    1440      @waitpid_mocks
    1441      def test_sigchld_unknown_status(self, m_waitpid):
    1442          callback = mock.Mock()
    1443  
    1444          # register a child
    1445          with self.watcher:
    1446              self.running = True
    1447              self.watcher.add_child_handler(53, callback, -19)
    1448  
    1449          self.assertFalse(callback.called)
    1450  
    1451          # terminate with unknown status
    1452          self.zombies[53] = 1178
    1453          self.running = False
    1454          self.watcher._sig_chld()
    1455  
    1456          callback.assert_called_once_with(53, 1178, -19)
    1457  
    1458          callback.reset_mock()
    1459  
    1460          # ensure that the child is effectively reaped
    1461          self.add_zombie(53, EXITCODE(101))
    1462          with self.ignore_warnings:
    1463              self.watcher._sig_chld()
    1464  
    1465          self.assertFalse(callback.called)
    1466  
    1467      @waitpid_mocks
    1468      def test_remove_child_handler(self, m_waitpid):
    1469          callback1 = mock.Mock()
    1470          callback2 = mock.Mock()
    1471          callback3 = mock.Mock()
    1472  
    1473          # register children
    1474          with self.watcher:
    1475              self.running = True
    1476              self.watcher.add_child_handler(54, callback1, 1)
    1477              self.watcher.add_child_handler(55, callback2, 2)
    1478              self.watcher.add_child_handler(56, callback3, 3)
    1479  
    1480          # remove child handler 1
    1481          self.assertTrue(self.watcher.remove_child_handler(54))
    1482  
    1483          # remove child handler 2 multiple times
    1484          self.assertTrue(self.watcher.remove_child_handler(55))
    1485          self.assertFalse(self.watcher.remove_child_handler(55))
    1486          self.assertFalse(self.watcher.remove_child_handler(55))
    1487  
    1488          # all children terminate
    1489          self.add_zombie(54, EXITCODE(0))
    1490          self.add_zombie(55, EXITCODE(1))
    1491          self.add_zombie(56, EXITCODE(2))
    1492          self.running = False
    1493          with self.ignore_warnings:
    1494              self.watcher._sig_chld()
    1495  
    1496          self.assertFalse(callback1.called)
    1497          self.assertFalse(callback2.called)
    1498          callback3.assert_called_once_with(56, 2, 3)
    1499  
    1500      @waitpid_mocks
    1501      def test_sigchld_unhandled_exception(self, m_waitpid):
    1502          callback = mock.Mock()
    1503  
    1504          # register a child
    1505          with self.watcher:
    1506              self.running = True
    1507              self.watcher.add_child_handler(57, callback)
    1508  
    1509          # raise an exception
    1510          m_waitpid.side_effect = ValueError
    1511  
    1512          with mock.patch.object(log.logger,
    1513                                 'error') as m_error:
    1514  
    1515              self.assertEqual(self.watcher._sig_chld(), None)
    1516              self.assertTrue(m_error.called)
    1517  
    1518      @waitpid_mocks
    1519      def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
    1520          # register a child
    1521          callback = mock.Mock()
    1522  
    1523          with self.watcher:
    1524              self.running = True
    1525              self.watcher.add_child_handler(58, callback)
    1526  
    1527          self.assertFalse(callback.called)
    1528  
    1529          # child terminates
    1530          self.running = False
    1531          self.add_zombie(58, EXITCODE(4))
    1532  
    1533          # waitpid is called elsewhere
    1534          os.waitpid(58, os.WNOHANG)
    1535  
    1536          m_waitpid.reset_mock()
    1537  
    1538          # sigchld
    1539          with self.ignore_warnings:
    1540              self.watcher._sig_chld()
    1541  
    1542          if isinstance(self.watcher, asyncio.FastChildWatcher):
    1543              # here the FastChildWatcher enters a deadlock
    1544              # (there is no way to prevent it)
    1545              self.assertFalse(callback.called)
    1546          else:
    1547              callback.assert_called_once_with(58, 255)
    1548  
    1549      @waitpid_mocks
    1550      def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
    1551          # register two children
    1552          callback1 = mock.Mock()
    1553          callback2 = mock.Mock()
    1554  
    1555          with self.ignore_warnings, self.watcher:
    1556              self.running = True
    1557              # child 1 terminates
    1558              self.add_zombie(591, EXITCODE(7))
    1559              # an unknown child terminates
    1560              self.add_zombie(593, EXITCODE(17))
    1561  
    1562              self.watcher._sig_chld()
    1563  
    1564              self.watcher.add_child_handler(591, callback1)
    1565              self.watcher.add_child_handler(592, callback2)
    1566  
    1567          callback1.assert_called_once_with(591, 7)
    1568          self.assertFalse(callback2.called)
    1569  
    1570      @waitpid_mocks
    1571      def test_set_loop(self, m_waitpid):
    1572          # register a child
    1573          callback = mock.Mock()
    1574  
    1575          with self.watcher:
    1576              self.running = True
    1577              self.watcher.add_child_handler(60, callback)
    1578  
    1579          # attach a new loop
    1580          old_loop = self.loop
    1581          self.loop = self.new_test_loop()
    1582          patch = mock.patch.object
    1583  
    1584          with patch(old_loop, "remove_signal_handler") as m_old_remove, \
    1585               patch(self.loop, "add_signal_handler") as m_new_add:
    1586  
    1587              self.watcher.attach_loop(self.loop)
    1588  
    1589              m_old_remove.assert_called_once_with(
    1590                  signal.SIGCHLD)
    1591              m_new_add.assert_called_once_with(
    1592                  signal.SIGCHLD, self.watcher._sig_chld)
    1593  
    1594          # child terminates
    1595          self.running = False
    1596          self.add_zombie(60, EXITCODE(9))
    1597          self.watcher._sig_chld()
    1598  
    1599          callback.assert_called_once_with(60, 9)
    1600  
    1601      @waitpid_mocks
    1602      def test_set_loop_race_condition(self, m_waitpid):
    1603          # register 3 children
    1604          callback1 = mock.Mock()
    1605          callback2 = mock.Mock()
    1606          callback3 = mock.Mock()
    1607  
    1608          with self.watcher:
    1609              self.running = True
    1610              self.watcher.add_child_handler(61, callback1)
    1611              self.watcher.add_child_handler(62, callback2)
    1612              self.watcher.add_child_handler(622, callback3)
    1613  
    1614          # detach the loop
    1615          old_loop = self.loop
    1616          self.loop = None
    1617  
    1618          with mock.patch.object(
    1619                  old_loop, "remove_signal_handler") as m_remove_signal_handler:
    1620  
    1621              with self.assertWarnsRegex(
    1622                      RuntimeWarning, 'A loop is being detached'):
    1623                  self.watcher.attach_loop(None)
    1624  
    1625              m_remove_signal_handler.assert_called_once_with(
    1626                  signal.SIGCHLD)
    1627  
    1628          # child 1 & 2 terminate
    1629          self.add_zombie(61, EXITCODE(11))
    1630          self.add_zombie(62, SIGNAL(5))
    1631  
    1632          # SIGCHLD was not caught
    1633          self.assertFalse(callback1.called)
    1634          self.assertFalse(callback2.called)
    1635          self.assertFalse(callback3.called)
    1636  
    1637          # attach a new loop
    1638          self.loop = self.new_test_loop()
    1639  
    1640          with mock.patch.object(
    1641                  self.loop, "add_signal_handler") as m_add_signal_handler:
    1642  
    1643              self.watcher.attach_loop(self.loop)
    1644  
    1645              m_add_signal_handler.assert_called_once_with(
    1646                  signal.SIGCHLD, self.watcher._sig_chld)
    1647              callback1.assert_called_once_with(61, 11)  # race condition!
    1648              callback2.assert_called_once_with(62, -5)  # race condition!
    1649              self.assertFalse(callback3.called)
    1650  
    1651          callback1.reset_mock()
    1652          callback2.reset_mock()
    1653  
    1654          # child 3 terminates
    1655          self.running = False
    1656          self.add_zombie(622, EXITCODE(19))
    1657          self.watcher._sig_chld()
    1658  
    1659          self.assertFalse(callback1.called)
    1660          self.assertFalse(callback2.called)
    1661          callback3.assert_called_once_with(622, 19)
    1662  
    1663      @waitpid_mocks
    1664      def test_close(self, m_waitpid):
    1665          # register two children
    1666          callback1 = mock.Mock()
    1667  
    1668          with self.watcher:
    1669              self.running = True
    1670              # child 1 terminates
    1671              self.add_zombie(63, EXITCODE(9))
    1672              # other child terminates
    1673              self.add_zombie(65, EXITCODE(18))
    1674              self.watcher._sig_chld()
    1675  
    1676              self.watcher.add_child_handler(63, callback1)
    1677              self.watcher.add_child_handler(64, callback1)
    1678  
    1679              self.assertEqual(len(self.watcher._callbacks), 1)
    1680              if isinstance(self.watcher, asyncio.FastChildWatcher):
    1681                  self.assertEqual(len(self.watcher._zombies), 1)
    1682  
    1683              with mock.patch.object(
    1684                      self.loop,
    1685                      "remove_signal_handler") as m_remove_signal_handler:
    1686  
    1687                  self.watcher.close()
    1688  
    1689                  m_remove_signal_handler.assert_called_once_with(
    1690                      signal.SIGCHLD)
    1691                  self.assertFalse(self.watcher._callbacks)
    1692                  if isinstance(self.watcher, asyncio.FastChildWatcher):
    1693                      self.assertFalse(self.watcher._zombies)
    1694  
    1695  
    1696  class ESC[4;38;5;81mSafeChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1697      def create_watcher(self):
    1698          with warnings.catch_warnings():
    1699              warnings.simplefilter("ignore", DeprecationWarning)
    1700              return asyncio.SafeChildWatcher()
    1701  
    1702  
    1703  class ESC[4;38;5;81mFastChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1704      def create_watcher(self):
    1705          with warnings.catch_warnings():
    1706              warnings.simplefilter("ignore", DeprecationWarning)
    1707              return asyncio.FastChildWatcher()
    1708  
    1709  
    1710  class ESC[4;38;5;81mPolicyTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1711  
    1712      def create_policy(self):
    1713          return asyncio.DefaultEventLoopPolicy()
    1714  
    1715      @mock.patch('asyncio.unix_events.can_use_pidfd')
    1716      def test_get_default_child_watcher(self, m_can_use_pidfd):
    1717          m_can_use_pidfd.return_value = False
    1718          policy = self.create_policy()
    1719          self.assertIsNone(policy._watcher)
    1720          with self.assertWarns(DeprecationWarning):
    1721              watcher = policy.get_child_watcher()
    1722          self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
    1723  
    1724          self.assertIs(policy._watcher, watcher)
    1725          with self.assertWarns(DeprecationWarning):
    1726              self.assertIs(watcher, policy.get_child_watcher())
    1727  
    1728          m_can_use_pidfd.return_value = True
    1729          policy = self.create_policy()
    1730          self.assertIsNone(policy._watcher)
    1731          with self.assertWarns(DeprecationWarning):
    1732              watcher = policy.get_child_watcher()
    1733          self.assertIsInstance(watcher, asyncio.PidfdChildWatcher)
    1734  
    1735          self.assertIs(policy._watcher, watcher)
    1736          with self.assertWarns(DeprecationWarning):
    1737              self.assertIs(watcher, policy.get_child_watcher())
    1738  
    1739      def test_get_child_watcher_after_set(self):
    1740          policy = self.create_policy()
    1741          with warnings.catch_warnings():
    1742              warnings.simplefilter("ignore", DeprecationWarning)
    1743              watcher = asyncio.FastChildWatcher()
    1744              policy.set_child_watcher(watcher)
    1745  
    1746          self.assertIs(policy._watcher, watcher)
    1747          with self.assertWarns(DeprecationWarning):
    1748              self.assertIs(watcher, policy.get_child_watcher())
    1749  
    1750      def test_get_child_watcher_thread(self):
    1751  
    1752          def f():
    1753              policy.set_event_loop(policy.new_event_loop())
    1754  
    1755              self.assertIsInstance(policy.get_event_loop(),
    1756                                    asyncio.AbstractEventLoop)
    1757              with warnings.catch_warnings():
    1758                  warnings.simplefilter("ignore", DeprecationWarning)
    1759                  watcher = policy.get_child_watcher()
    1760  
    1761              self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
    1762              self.assertIsNone(watcher._loop)
    1763  
    1764              policy.get_event_loop().close()
    1765  
    1766          policy = self.create_policy()
    1767          with warnings.catch_warnings():
    1768              warnings.simplefilter("ignore", DeprecationWarning)
    1769              policy.set_child_watcher(asyncio.SafeChildWatcher())
    1770  
    1771          th = threading.Thread(target=f)
    1772          th.start()
    1773          th.join()
    1774  
    1775      def test_child_watcher_replace_mainloop_existing(self):
    1776          policy = self.create_policy()
    1777          loop = policy.new_event_loop()
    1778          policy.set_event_loop(loop)
    1779  
    1780          # Explicitly setup SafeChildWatcher,
    1781          # default ThreadedChildWatcher has no _loop property
    1782          with warnings.catch_warnings():
    1783              warnings.simplefilter("ignore", DeprecationWarning)
    1784              watcher = asyncio.SafeChildWatcher()
    1785              policy.set_child_watcher(watcher)
    1786          watcher.attach_loop(loop)
    1787  
    1788          self.assertIs(watcher._loop, loop)
    1789  
    1790          new_loop = policy.new_event_loop()
    1791          policy.set_event_loop(new_loop)
    1792  
    1793          self.assertIs(watcher._loop, new_loop)
    1794  
    1795          policy.set_event_loop(None)
    1796  
    1797          self.assertIs(watcher._loop, None)
    1798  
    1799          loop.close()
    1800          new_loop.close()
    1801  
    1802  
    1803  class ESC[4;38;5;81mTestFunctional(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1804  
    1805      def setUp(self):
    1806          self.loop = asyncio.new_event_loop()
    1807          asyncio.set_event_loop(self.loop)
    1808  
    1809      def tearDown(self):
    1810          self.loop.close()
    1811          asyncio.set_event_loop(None)
    1812  
    1813      def test_add_reader_invalid_argument(self):
    1814          def assert_raises():
    1815              return self.assertRaisesRegex(ValueError, r'Invalid file object')
    1816  
    1817          cb = lambda: None
    1818  
    1819          with assert_raises():
    1820              self.loop.add_reader(object(), cb)
    1821          with assert_raises():
    1822              self.loop.add_writer(object(), cb)
    1823  
    1824          with assert_raises():
    1825              self.loop.remove_reader(object())
    1826          with assert_raises():
    1827              self.loop.remove_writer(object())
    1828  
    1829      def test_add_reader_or_writer_transport_fd(self):
    1830          def assert_raises():
    1831              return self.assertRaisesRegex(
    1832                  RuntimeError,
    1833                  r'File descriptor .* is used by transport')
    1834  
    1835          async def runner():
    1836              tr, pr = await self.loop.create_connection(
    1837                  lambda: asyncio.Protocol(), sock=rsock)
    1838  
    1839              try:
    1840                  cb = lambda: None
    1841  
    1842                  with assert_raises():
    1843                      self.loop.add_reader(rsock, cb)
    1844                  with assert_raises():
    1845                      self.loop.add_reader(rsock.fileno(), cb)
    1846  
    1847                  with assert_raises():
    1848                      self.loop.remove_reader(rsock)
    1849                  with assert_raises():
    1850                      self.loop.remove_reader(rsock.fileno())
    1851  
    1852                  with assert_raises():
    1853                      self.loop.add_writer(rsock, cb)
    1854                  with assert_raises():
    1855                      self.loop.add_writer(rsock.fileno(), cb)
    1856  
    1857                  with assert_raises():
    1858                      self.loop.remove_writer(rsock)
    1859                  with assert_raises():
    1860                      self.loop.remove_writer(rsock.fileno())
    1861  
    1862              finally:
    1863                  tr.close()
    1864  
    1865          rsock, wsock = socket.socketpair()
    1866          try:
    1867              self.loop.run_until_complete(runner())
    1868          finally:
    1869              rsock.close()
    1870              wsock.close()
    1871  
    1872  
    1873  @unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
    1874  class ESC[4;38;5;81mTestFork(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
    1875  
    1876      async def test_fork_not_share_event_loop(self):
    1877          # The forked process should not share the event loop with the parent
    1878          loop = asyncio.get_running_loop()
    1879          r, w = os.pipe()
    1880          self.addCleanup(os.close, r)
    1881          self.addCleanup(os.close, w)
    1882          pid = os.fork()
    1883          if pid == 0:
    1884              # child
    1885              try:
    1886                  with self.assertWarns(DeprecationWarning):
    1887                      loop = asyncio.get_event_loop_policy().get_event_loop()
    1888                  os.write(w, b'LOOP:' + str(id(loop)).encode())
    1889              except RuntimeError:
    1890                  os.write(w, b'NO LOOP')
    1891              except BaseException as e:
    1892                  os.write(w, b'ERROR:' + ascii(e).encode())
    1893              finally:
    1894                  os._exit(0)
    1895          else:
    1896              # parent
    1897              result = os.read(r, 100)
    1898              self.assertEqual(result[:5], b'LOOP:', result)
    1899              self.assertNotEqual(int(result[5:]), id(loop))
    1900              wait_process(pid, exitcode=0)
    1901  
    1902      @hashlib_helper.requires_hashdigest('md5')
    1903      def test_fork_signal_handling(self):
    1904          # Sending signal to the forked process should not affect the parent
    1905          # process
    1906          ctx = multiprocessing.get_context('fork')
    1907          manager = ctx.Manager()
    1908          self.addCleanup(manager.shutdown)
    1909          child_started = manager.Event()
    1910          child_handled = manager.Event()
    1911          parent_handled = manager.Event()
    1912  
    1913          def child_main():
    1914              signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
    1915              child_started.set()
    1916  
    1917          async def main():
    1918              loop = asyncio.get_running_loop()
    1919              loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
    1920  
    1921              process = ctx.Process(target=child_main)
    1922              process.start()
    1923              child_started.wait()
    1924              os.kill(process.pid, signal.SIGTERM)
    1925              process.join()
    1926  
    1927              async def func():
    1928                  await asyncio.sleep(0.1)
    1929                  return 42
    1930  
    1931              # Test parent's loop is still functional
    1932              self.assertEqual(await asyncio.create_task(func()), 42)
    1933  
    1934          asyncio.run(main())
    1935  
    1936          self.assertFalse(parent_handled.is_set())
    1937          self.assertTrue(child_handled.is_set())
    1938  
    1939      @hashlib_helper.requires_hashdigest('md5')
    1940      def test_fork_asyncio_run(self):
    1941          ctx = multiprocessing.get_context('fork')
    1942          manager = ctx.Manager()
    1943          self.addCleanup(manager.shutdown)
    1944          result = manager.Value('i', 0)
    1945  
    1946          async def child_main():
    1947              await asyncio.sleep(0.1)
    1948              result.value = 42
    1949  
    1950          process = ctx.Process(target=lambda: asyncio.run(child_main()))
    1951          process.start()
    1952          process.join()
    1953  
    1954          self.assertEqual(result.value, 42)
    1955  
    1956      @hashlib_helper.requires_hashdigest('md5')
    1957      def test_fork_asyncio_subprocess(self):
    1958          ctx = multiprocessing.get_context('fork')
    1959          manager = ctx.Manager()
    1960          self.addCleanup(manager.shutdown)
    1961          result = manager.Value('i', 1)
    1962  
    1963          async def child_main():
    1964              proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
    1965              result.value = await proc.wait()
    1966  
    1967          process = ctx.Process(target=lambda: asyncio.run(child_main()))
    1968          process.start()
    1969          process.join()
    1970  
    1971          self.assertEqual(result.value, 0)
    1972  
    1973  if __name__ == '__main__':
    1974      unittest.main()