(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_unix_events.py
       1  """Tests for unix_events.py."""
       2  
       3  import contextlib
       4  import errno
       5  import io
       6  import os
       7  import pathlib
       8  import signal
       9  import socket
      10  import stat
      11  import sys
      12  import tempfile
      13  import threading
      14  import unittest
      15  from unittest import mock
      16  from test.support import os_helper
      17  from test.support import socket_helper
      18  
      19  if sys.platform == 'win32':
      20      raise unittest.SkipTest('UNIX only')
      21  
      22  
      23  import asyncio
      24  from asyncio import log
      25  from asyncio import unix_events
      26  from test.test_asyncio import utils as test_utils
      27  
      28  
      29  def tearDownModule():
      30      asyncio.set_event_loop_policy(None)
      31  
      32  
      33  MOCK_ANY = mock.ANY
      34  
      35  
      36  def EXITCODE(exitcode):
      37      return 32768 + exitcode
      38  
      39  
      40  def SIGNAL(signum):
      41      if not 1 <= signum <= 68:
      42          raise AssertionError(f'invalid signum {signum}')
      43      return 32768 - signum
      44  
      45  
      46  def close_pipe_transport(transport):
      47      # Don't call transport.close() because the event loop and the selector
      48      # are mocked
      49      if transport._pipe is None:
      50          return
      51      transport._pipe.close()
      52      transport._pipe = None
      53  
      54  
      55  @unittest.skipUnless(signal, 'Signals are not supported')
      56  class ESC[4;38;5;81mSelectorEventLoopSignalTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      57  
      58      def setUp(self):
      59          super().setUp()
      60          self.loop = asyncio.SelectorEventLoop()
      61          self.set_event_loop(self.loop)
      62  
      63      def test_check_signal(self):
      64          self.assertRaises(
      65              TypeError, self.loop._check_signal, '1')
      66          self.assertRaises(
      67              ValueError, self.loop._check_signal, signal.NSIG + 1)
      68  
      69      def test_handle_signal_no_handler(self):
      70          self.loop._handle_signal(signal.NSIG + 1)
      71  
      72      def test_handle_signal_cancelled_handler(self):
      73          h = asyncio.Handle(mock.Mock(), (),
      74                             loop=mock.Mock())
      75          h.cancel()
      76          self.loop._signal_handlers[signal.NSIG + 1] = h
      77          self.loop.remove_signal_handler = mock.Mock()
      78          self.loop._handle_signal(signal.NSIG + 1)
      79          self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
      80  
      81      @mock.patch('asyncio.unix_events.signal')
      82      def test_add_signal_handler_setup_error(self, m_signal):
      83          m_signal.NSIG = signal.NSIG
      84          m_signal.valid_signals = signal.valid_signals
      85          m_signal.set_wakeup_fd.side_effect = ValueError
      86  
      87          self.assertRaises(
      88              RuntimeError,
      89              self.loop.add_signal_handler,
      90              signal.SIGINT, lambda: True)
      91  
      92      @mock.patch('asyncio.unix_events.signal')
      93      def test_add_signal_handler_coroutine_error(self, m_signal):
      94          m_signal.NSIG = signal.NSIG
      95  
      96          async def simple_coroutine():
      97              pass
      98  
      99          # callback must not be a coroutine function
     100          coro_func = simple_coroutine
     101          coro_obj = coro_func()
     102          self.addCleanup(coro_obj.close)
     103          for func in (coro_func, coro_obj):
     104              self.assertRaisesRegex(
     105                  TypeError, 'coroutines cannot be used with add_signal_handler',
     106                  self.loop.add_signal_handler,
     107                  signal.SIGINT, func)
     108  
     109      @mock.patch('asyncio.unix_events.signal')
     110      def test_add_signal_handler(self, m_signal):
     111          m_signal.NSIG = signal.NSIG
     112          m_signal.valid_signals = signal.valid_signals
     113  
     114          cb = lambda: True
     115          self.loop.add_signal_handler(signal.SIGHUP, cb)
     116          h = self.loop._signal_handlers.get(signal.SIGHUP)
     117          self.assertIsInstance(h, asyncio.Handle)
     118          self.assertEqual(h._callback, cb)
     119  
     120      @mock.patch('asyncio.unix_events.signal')
     121      def test_add_signal_handler_install_error(self, m_signal):
     122          m_signal.NSIG = signal.NSIG
     123          m_signal.valid_signals = signal.valid_signals
     124  
     125          def set_wakeup_fd(fd):
     126              if fd == -1:
     127                  raise ValueError()
     128          m_signal.set_wakeup_fd = set_wakeup_fd
     129  
     130          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     131              errno = errno.EFAULT
     132          m_signal.signal.side_effect = Err
     133  
     134          self.assertRaises(
     135              Err,
     136              self.loop.add_signal_handler,
     137              signal.SIGINT, lambda: True)
     138  
     139      @mock.patch('asyncio.unix_events.signal')
     140      @mock.patch('asyncio.base_events.logger')
     141      def test_add_signal_handler_install_error2(self, m_logging, m_signal):
     142          m_signal.NSIG = signal.NSIG
     143          m_signal.valid_signals = signal.valid_signals
     144  
     145          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     146              errno = errno.EINVAL
     147          m_signal.signal.side_effect = Err
     148  
     149          self.loop._signal_handlers[signal.SIGHUP] = lambda: True
     150          self.assertRaises(
     151              RuntimeError,
     152              self.loop.add_signal_handler,
     153              signal.SIGINT, lambda: True)
     154          self.assertFalse(m_logging.info.called)
     155          self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
     156  
     157      @mock.patch('asyncio.unix_events.signal')
     158      @mock.patch('asyncio.base_events.logger')
     159      def test_add_signal_handler_install_error3(self, m_logging, m_signal):
     160          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     161              errno = errno.EINVAL
     162          m_signal.signal.side_effect = Err
     163          m_signal.NSIG = signal.NSIG
     164          m_signal.valid_signals = signal.valid_signals
     165  
     166          self.assertRaises(
     167              RuntimeError,
     168              self.loop.add_signal_handler,
     169              signal.SIGINT, lambda: True)
     170          self.assertFalse(m_logging.info.called)
     171          self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
     172  
     173      @mock.patch('asyncio.unix_events.signal')
     174      def test_remove_signal_handler(self, m_signal):
     175          m_signal.NSIG = signal.NSIG
     176          m_signal.valid_signals = signal.valid_signals
     177  
     178          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     179  
     180          self.assertTrue(
     181              self.loop.remove_signal_handler(signal.SIGHUP))
     182          self.assertTrue(m_signal.set_wakeup_fd.called)
     183          self.assertTrue(m_signal.signal.called)
     184          self.assertEqual(
     185              (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
     186  
     187      @mock.patch('asyncio.unix_events.signal')
     188      def test_remove_signal_handler_2(self, m_signal):
     189          m_signal.NSIG = signal.NSIG
     190          m_signal.SIGINT = signal.SIGINT
     191          m_signal.valid_signals = signal.valid_signals
     192  
     193          self.loop.add_signal_handler(signal.SIGINT, lambda: True)
     194          self.loop._signal_handlers[signal.SIGHUP] = object()
     195          m_signal.set_wakeup_fd.reset_mock()
     196  
     197          self.assertTrue(
     198              self.loop.remove_signal_handler(signal.SIGINT))
     199          self.assertFalse(m_signal.set_wakeup_fd.called)
     200          self.assertTrue(m_signal.signal.called)
     201          self.assertEqual(
     202              (signal.SIGINT, m_signal.default_int_handler),
     203              m_signal.signal.call_args[0])
     204  
     205      @mock.patch('asyncio.unix_events.signal')
     206      @mock.patch('asyncio.base_events.logger')
     207      def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
     208          m_signal.NSIG = signal.NSIG
     209          m_signal.valid_signals = signal.valid_signals
     210          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     211  
     212          m_signal.set_wakeup_fd.side_effect = ValueError
     213  
     214          self.loop.remove_signal_handler(signal.SIGHUP)
     215          self.assertTrue(m_logging.info)
     216  
     217      @mock.patch('asyncio.unix_events.signal')
     218      def test_remove_signal_handler_error(self, m_signal):
     219          m_signal.NSIG = signal.NSIG
     220          m_signal.valid_signals = signal.valid_signals
     221          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     222  
     223          m_signal.signal.side_effect = OSError
     224  
     225          self.assertRaises(
     226              OSError, self.loop.remove_signal_handler, signal.SIGHUP)
     227  
     228      @mock.patch('asyncio.unix_events.signal')
     229      def test_remove_signal_handler_error2(self, m_signal):
     230          m_signal.NSIG = signal.NSIG
     231          m_signal.valid_signals = signal.valid_signals
     232          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     233  
     234          class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
     235              errno = errno.EINVAL
     236          m_signal.signal.side_effect = Err
     237  
     238          self.assertRaises(
     239              RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
     240  
     241      @mock.patch('asyncio.unix_events.signal')
     242      def test_close(self, m_signal):
     243          m_signal.NSIG = signal.NSIG
     244          m_signal.valid_signals = signal.valid_signals
     245  
     246          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     247          self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
     248  
     249          self.assertEqual(len(self.loop._signal_handlers), 2)
     250  
     251          m_signal.set_wakeup_fd.reset_mock()
     252  
     253          self.loop.close()
     254  
     255          self.assertEqual(len(self.loop._signal_handlers), 0)
     256          m_signal.set_wakeup_fd.assert_called_once_with(-1)
     257  
     258      @mock.patch('asyncio.unix_events.sys')
     259      @mock.patch('asyncio.unix_events.signal')
     260      def test_close_on_finalizing(self, m_signal, m_sys):
     261          m_signal.NSIG = signal.NSIG
     262          m_signal.valid_signals = signal.valid_signals
     263          self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
     264  
     265          self.assertEqual(len(self.loop._signal_handlers), 1)
     266          m_sys.is_finalizing.return_value = True
     267          m_signal.signal.reset_mock()
     268  
     269          with self.assertWarnsRegex(ResourceWarning,
     270                                     "skipping signal handlers removal"):
     271              self.loop.close()
     272  
     273          self.assertEqual(len(self.loop._signal_handlers), 0)
     274          self.assertFalse(m_signal.signal.called)
     275  
     276  
     277  @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
     278                       'UNIX Sockets are not supported')
     279  class ESC[4;38;5;81mSelectorEventLoopUnixSocketTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     280  
     281      def setUp(self):
     282          super().setUp()
     283          self.loop = asyncio.SelectorEventLoop()
     284          self.set_event_loop(self.loop)
     285  
     286      @socket_helper.skip_unless_bind_unix_socket
     287      def test_create_unix_server_existing_path_sock(self):
     288          with test_utils.unix_socket_path() as path:
     289              sock = socket.socket(socket.AF_UNIX)
     290              sock.bind(path)
     291              sock.listen(1)
     292              sock.close()
     293  
     294              coro = self.loop.create_unix_server(lambda: None, path)
     295              srv = self.loop.run_until_complete(coro)
     296              srv.close()
     297              self.loop.run_until_complete(srv.wait_closed())
     298  
     299      @socket_helper.skip_unless_bind_unix_socket
     300      def test_create_unix_server_pathlib(self):
     301          with test_utils.unix_socket_path() as path:
     302              path = pathlib.Path(path)
     303              srv_coro = self.loop.create_unix_server(lambda: None, path)
     304              srv = self.loop.run_until_complete(srv_coro)
     305              srv.close()
     306              self.loop.run_until_complete(srv.wait_closed())
     307  
     308      def test_create_unix_connection_pathlib(self):
     309          with test_utils.unix_socket_path() as path:
     310              path = pathlib.Path(path)
     311              coro = self.loop.create_unix_connection(lambda: None, path)
     312              with self.assertRaises(FileNotFoundError):
     313                  # If pathlib.Path wasn't supported, the exception would be
     314                  # different.
     315                  self.loop.run_until_complete(coro)
     316  
     317      def test_create_unix_server_existing_path_nonsock(self):
     318          with tempfile.NamedTemporaryFile() as file:
     319              coro = self.loop.create_unix_server(lambda: None, file.name)
     320              with self.assertRaisesRegex(OSError,
     321                                          'Address.*is already in use'):
     322                  self.loop.run_until_complete(coro)
     323  
     324      def test_create_unix_server_ssl_bool(self):
     325          coro = self.loop.create_unix_server(lambda: None, path='spam',
     326                                              ssl=True)
     327          with self.assertRaisesRegex(TypeError,
     328                                      'ssl argument must be an SSLContext'):
     329              self.loop.run_until_complete(coro)
     330  
     331      def test_create_unix_server_nopath_nosock(self):
     332          coro = self.loop.create_unix_server(lambda: None, path=None)
     333          with self.assertRaisesRegex(ValueError,
     334                                      'path was not specified, and no sock'):
     335              self.loop.run_until_complete(coro)
     336  
     337      def test_create_unix_server_path_inetsock(self):
     338          sock = socket.socket()
     339          with sock:
     340              coro = self.loop.create_unix_server(lambda: None, path=None,
     341                                                  sock=sock)
     342              with self.assertRaisesRegex(ValueError,
     343                                          'A UNIX Domain Stream.*was expected'):
     344                  self.loop.run_until_complete(coro)
     345  
     346      def test_create_unix_server_path_dgram(self):
     347          sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
     348          with sock:
     349              coro = self.loop.create_unix_server(lambda: None, path=None,
     350                                                  sock=sock)
     351              with self.assertRaisesRegex(ValueError,
     352                                          'A UNIX Domain Stream.*was expected'):
     353                  self.loop.run_until_complete(coro)
     354  
     355      @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
     356                           'no socket.SOCK_NONBLOCK (linux only)')
     357      @socket_helper.skip_unless_bind_unix_socket
     358      def test_create_unix_server_path_stream_bittype(self):
     359          sock = socket.socket(
     360              socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
     361          with tempfile.NamedTemporaryFile() as file:
     362              fn = file.name
     363          try:
     364              with sock:
     365                  sock.bind(fn)
     366                  coro = self.loop.create_unix_server(lambda: None, path=None,
     367                                                      sock=sock)
     368                  srv = self.loop.run_until_complete(coro)
     369                  srv.close()
     370                  self.loop.run_until_complete(srv.wait_closed())
     371          finally:
     372              os.unlink(fn)
     373  
     374      def test_create_unix_server_ssl_timeout_with_plain_sock(self):
     375          coro = self.loop.create_unix_server(lambda: None, path='spam',
     376                                              ssl_handshake_timeout=1)
     377          with self.assertRaisesRegex(
     378                  ValueError,
     379                  'ssl_handshake_timeout is only meaningful with ssl'):
     380              self.loop.run_until_complete(coro)
     381  
     382      def test_create_unix_connection_path_inetsock(self):
     383          sock = socket.socket()
     384          with sock:
     385              coro = self.loop.create_unix_connection(lambda: None,
     386                                                      sock=sock)
     387              with self.assertRaisesRegex(ValueError,
     388                                          'A UNIX Domain Stream.*was expected'):
     389                  self.loop.run_until_complete(coro)
     390  
     391      @mock.patch('asyncio.unix_events.socket')
     392      def test_create_unix_server_bind_error(self, m_socket):
     393          # Ensure that the socket is closed on any bind error
     394          sock = mock.Mock()
     395          m_socket.socket.return_value = sock
     396  
     397          sock.bind.side_effect = OSError
     398          coro = self.loop.create_unix_server(lambda: None, path="/test")
     399          with self.assertRaises(OSError):
     400              self.loop.run_until_complete(coro)
     401          self.assertTrue(sock.close.called)
     402  
     403          sock.bind.side_effect = MemoryError
     404          coro = self.loop.create_unix_server(lambda: None, path="/test")
     405          with self.assertRaises(MemoryError):
     406              self.loop.run_until_complete(coro)
     407          self.assertTrue(sock.close.called)
     408  
     409      def test_create_unix_connection_path_sock(self):
     410          coro = self.loop.create_unix_connection(
     411              lambda: None, os.devnull, sock=object())
     412          with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
     413              self.loop.run_until_complete(coro)
     414  
     415      def test_create_unix_connection_nopath_nosock(self):
     416          coro = self.loop.create_unix_connection(
     417              lambda: None, None)
     418          with self.assertRaisesRegex(ValueError,
     419                                      'no path and sock were specified'):
     420              self.loop.run_until_complete(coro)
     421  
     422      def test_create_unix_connection_nossl_serverhost(self):
     423          coro = self.loop.create_unix_connection(
     424              lambda: None, os.devnull, server_hostname='spam')
     425          with self.assertRaisesRegex(ValueError,
     426                                      'server_hostname is only meaningful'):
     427              self.loop.run_until_complete(coro)
     428  
     429      def test_create_unix_connection_ssl_noserverhost(self):
     430          coro = self.loop.create_unix_connection(
     431              lambda: None, os.devnull, ssl=True)
     432  
     433          with self.assertRaisesRegex(
     434              ValueError, 'you have to pass server_hostname when using ssl'):
     435  
     436              self.loop.run_until_complete(coro)
     437  
     438      def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
     439          coro = self.loop.create_unix_connection(lambda: None, path='spam',
     440                                              ssl_handshake_timeout=1)
     441          with self.assertRaisesRegex(
     442                  ValueError,
     443                  'ssl_handshake_timeout is only meaningful with ssl'):
     444              self.loop.run_until_complete(coro)
     445  
     446  
     447  @unittest.skipUnless(hasattr(os, 'sendfile'),
     448                       'sendfile is not supported')
     449  class ESC[4;38;5;81mSelectorEventLoopUnixSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     450      DATA = b"12345abcde" * 16 * 1024  # 160 KiB
     451  
     452      class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     453  
     454          def __init__(self, loop):
     455              self.started = False
     456              self.closed = False
     457              self.data = bytearray()
     458              self.fut = loop.create_future()
     459              self.transport = None
     460              self._ready = loop.create_future()
     461  
     462          def connection_made(self, transport):
     463              self.started = True
     464              self.transport = transport
     465              self._ready.set_result(None)
     466  
     467          def data_received(self, data):
     468              self.data.extend(data)
     469  
     470          def connection_lost(self, exc):
     471              self.closed = True
     472              self.fut.set_result(None)
     473  
     474          async def wait_closed(self):
     475              await self.fut
     476  
     477      @classmethod
     478      def setUpClass(cls):
     479          with open(os_helper.TESTFN, 'wb') as fp:
     480              fp.write(cls.DATA)
     481          super().setUpClass()
     482  
     483      @classmethod
     484      def tearDownClass(cls):
     485          os_helper.unlink(os_helper.TESTFN)
     486          super().tearDownClass()
     487  
     488      def setUp(self):
     489          self.loop = asyncio.new_event_loop()
     490          self.set_event_loop(self.loop)
     491          self.file = open(os_helper.TESTFN, 'rb')
     492          self.addCleanup(self.file.close)
     493          super().setUp()
     494  
     495      def make_socket(self, cleanup=True):
     496          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     497          sock.setblocking(False)
     498          sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
     499          sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
     500          if cleanup:
     501              self.addCleanup(sock.close)
     502          return sock
     503  
     504      def run_loop(self, coro):
     505          return self.loop.run_until_complete(coro)
     506  
     507      def prepare(self):
     508          sock = self.make_socket()
     509          proto = self.MyProto(self.loop)
     510          port = socket_helper.find_unused_port()
     511          srv_sock = self.make_socket(cleanup=False)
     512          srv_sock.bind((socket_helper.HOST, port))
     513          server = self.run_loop(self.loop.create_server(
     514              lambda: proto, sock=srv_sock))
     515          self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
     516          self.run_loop(proto._ready)
     517  
     518          def cleanup():
     519              proto.transport.close()
     520              self.run_loop(proto.wait_closed())
     521  
     522              server.close()
     523              self.run_loop(server.wait_closed())
     524  
     525          self.addCleanup(cleanup)
     526  
     527          return sock, proto
     528  
     529      def test_sock_sendfile_not_available(self):
     530          sock, proto = self.prepare()
     531          with mock.patch('asyncio.unix_events.os', spec=[]):
     532              with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     533                                          "os[.]sendfile[(][)] is not available"):
     534                  self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
     535                                                                0, None))
     536          self.assertEqual(self.file.tell(), 0)
     537  
     538      def test_sock_sendfile_not_a_file(self):
     539          sock, proto = self.prepare()
     540          f = object()
     541          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     542                                      "not a regular file"):
     543              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     544                                                            0, None))
     545          self.assertEqual(self.file.tell(), 0)
     546  
     547      def test_sock_sendfile_iobuffer(self):
     548          sock, proto = self.prepare()
     549          f = io.BytesIO()
     550          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     551                                      "not a regular file"):
     552              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     553                                                            0, None))
     554          self.assertEqual(self.file.tell(), 0)
     555  
     556      def test_sock_sendfile_not_regular_file(self):
     557          sock, proto = self.prepare()
     558          f = mock.Mock()
     559          f.fileno.return_value = -1
     560          with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
     561                                      "not a regular file"):
     562              self.run_loop(self.loop._sock_sendfile_native(sock, f,
     563                                                            0, None))
     564          self.assertEqual(self.file.tell(), 0)
     565  
     566      def test_sock_sendfile_cancel1(self):
     567          sock, proto = self.prepare()
     568  
     569          fut = self.loop.create_future()
     570          fileno = self.file.fileno()
     571          self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     572                                               0, None, len(self.DATA), 0)
     573          fut.cancel()
     574          with contextlib.suppress(asyncio.CancelledError):
     575              self.run_loop(fut)
     576          with self.assertRaises(KeyError):
     577              self.loop._selector.get_key(sock)
     578  
     579      def test_sock_sendfile_cancel2(self):
     580          sock, proto = self.prepare()
     581  
     582          fut = self.loop.create_future()
     583          fileno = self.file.fileno()
     584          self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     585                                               0, None, len(self.DATA), 0)
     586          fut.cancel()
     587          self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
     588                                               0, None, len(self.DATA), 0)
     589          with self.assertRaises(KeyError):
     590              self.loop._selector.get_key(sock)
     591  
     592      def test_sock_sendfile_blocking_error(self):
     593          sock, proto = self.prepare()
     594  
     595          fileno = self.file.fileno()
     596          fut = mock.Mock()
     597          fut.cancelled.return_value = False
     598          with mock.patch('os.sendfile', side_effect=BlockingIOError()):
     599              self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     600                                                   0, None, len(self.DATA), 0)
     601          key = self.loop._selector.get_key(sock)
     602          self.assertIsNotNone(key)
     603          fut.add_done_callback.assert_called_once_with(mock.ANY)
     604  
     605      def test_sock_sendfile_os_error_first_call(self):
     606          sock, proto = self.prepare()
     607  
     608          fileno = self.file.fileno()
     609          fut = self.loop.create_future()
     610          with mock.patch('os.sendfile', side_effect=OSError()):
     611              self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
     612                                                   0, None, len(self.DATA), 0)
     613          with self.assertRaises(KeyError):
     614              self.loop._selector.get_key(sock)
     615          exc = fut.exception()
     616          self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
     617          self.assertEqual(0, self.file.tell())
     618  
     619      def test_sock_sendfile_os_error_next_call(self):
     620          sock, proto = self.prepare()
     621  
     622          fileno = self.file.fileno()
     623          fut = self.loop.create_future()
     624          err = OSError()
     625          with mock.patch('os.sendfile', side_effect=err):
     626              self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
     627                                                   sock, fileno,
     628                                                   1000, None, len(self.DATA),
     629                                                   1000)
     630          with self.assertRaises(KeyError):
     631              self.loop._selector.get_key(sock)
     632          exc = fut.exception()
     633          self.assertIs(exc, err)
     634          self.assertEqual(1000, self.file.tell())
     635  
     636      def test_sock_sendfile_exception(self):
     637          sock, proto = self.prepare()
     638  
     639          fileno = self.file.fileno()
     640          fut = self.loop.create_future()
     641          err = asyncio.SendfileNotAvailableError()
     642          with mock.patch('os.sendfile', side_effect=err):
     643              self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
     644                                                   sock, fileno,
     645                                                   1000, None, len(self.DATA),
     646                                                   1000)
     647          with self.assertRaises(KeyError):
     648              self.loop._selector.get_key(sock)
     649          exc = fut.exception()
     650          self.assertIs(exc, err)
     651          self.assertEqual(1000, self.file.tell())
     652  
     653  
     654  class ESC[4;38;5;81mUnixReadPipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     655  
     656      def setUp(self):
     657          super().setUp()
     658          self.loop = self.new_test_loop()
     659          self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
     660          self.pipe = mock.Mock(spec_set=io.RawIOBase)
     661          self.pipe.fileno.return_value = 5
     662  
     663          blocking_patcher = mock.patch('os.set_blocking')
     664          blocking_patcher.start()
     665          self.addCleanup(blocking_patcher.stop)
     666  
     667          fstat_patcher = mock.patch('os.fstat')
     668          m_fstat = fstat_patcher.start()
     669          st = mock.Mock()
     670          st.st_mode = stat.S_IFIFO
     671          m_fstat.return_value = st
     672          self.addCleanup(fstat_patcher.stop)
     673  
     674      def read_pipe_transport(self, waiter=None):
     675          transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
     676                                                         self.protocol,
     677                                                         waiter=waiter)
     678          self.addCleanup(close_pipe_transport, transport)
     679          return transport
     680  
     681      def test_ctor(self):
     682          waiter = self.loop.create_future()
     683          tr = self.read_pipe_transport(waiter=waiter)
     684          self.loop.run_until_complete(waiter)
     685  
     686          self.protocol.connection_made.assert_called_with(tr)
     687          self.loop.assert_reader(5, tr._read_ready)
     688          self.assertIsNone(waiter.result())
     689  
     690      @mock.patch('os.read')
     691      def test__read_ready(self, m_read):
     692          tr = self.read_pipe_transport()
     693          m_read.return_value = b'data'
     694          tr._read_ready()
     695  
     696          m_read.assert_called_with(5, tr.max_size)
     697          self.protocol.data_received.assert_called_with(b'data')
     698  
     699      @mock.patch('os.read')
     700      def test__read_ready_eof(self, m_read):
     701          tr = self.read_pipe_transport()
     702          m_read.return_value = b''
     703          tr._read_ready()
     704  
     705          m_read.assert_called_with(5, tr.max_size)
     706          self.assertFalse(self.loop.readers)
     707          test_utils.run_briefly(self.loop)
     708          self.protocol.eof_received.assert_called_with()
     709          self.protocol.connection_lost.assert_called_with(None)
     710  
     711      @mock.patch('os.read')
     712      def test__read_ready_blocked(self, m_read):
     713          tr = self.read_pipe_transport()
     714          m_read.side_effect = BlockingIOError
     715          tr._read_ready()
     716  
     717          m_read.assert_called_with(5, tr.max_size)
     718          test_utils.run_briefly(self.loop)
     719          self.assertFalse(self.protocol.data_received.called)
     720  
     721      @mock.patch('asyncio.log.logger.error')
     722      @mock.patch('os.read')
     723      def test__read_ready_error(self, m_read, m_logexc):
     724          tr = self.read_pipe_transport()
     725          err = OSError()
     726          m_read.side_effect = err
     727          tr._close = mock.Mock()
     728          tr._read_ready()
     729  
     730          m_read.assert_called_with(5, tr.max_size)
     731          tr._close.assert_called_with(err)
     732          m_logexc.assert_called_with(
     733              test_utils.MockPattern(
     734                  'Fatal read error on pipe transport'
     735                  '\nprotocol:.*\ntransport:.*'),
     736              exc_info=(OSError, MOCK_ANY, MOCK_ANY))
     737  
     738      @mock.patch('os.read')
     739      def test_pause_reading(self, m_read):
     740          tr = self.read_pipe_transport()
     741          m = mock.Mock()
     742          self.loop.add_reader(5, m)
     743          tr.pause_reading()
     744          self.assertFalse(self.loop.readers)
     745  
     746      @mock.patch('os.read')
     747      def test_resume_reading(self, m_read):
     748          tr = self.read_pipe_transport()
     749          tr.pause_reading()
     750          tr.resume_reading()
     751          self.loop.assert_reader(5, tr._read_ready)
     752  
     753      @mock.patch('os.read')
     754      def test_close(self, m_read):
     755          tr = self.read_pipe_transport()
     756          tr._close = mock.Mock()
     757          tr.close()
     758          tr._close.assert_called_with(None)
     759  
     760      @mock.patch('os.read')
     761      def test_close_already_closing(self, m_read):
     762          tr = self.read_pipe_transport()
     763          tr._closing = True
     764          tr._close = mock.Mock()
     765          tr.close()
     766          self.assertFalse(tr._close.called)
     767  
     768      @mock.patch('os.read')
     769      def test__close(self, m_read):
     770          tr = self.read_pipe_transport()
     771          err = object()
     772          tr._close(err)
     773          self.assertTrue(tr.is_closing())
     774          self.assertFalse(self.loop.readers)
     775          test_utils.run_briefly(self.loop)
     776          self.protocol.connection_lost.assert_called_with(err)
     777  
     778      def test__call_connection_lost(self):
     779          tr = self.read_pipe_transport()
     780          self.assertIsNotNone(tr._protocol)
     781          self.assertIsNotNone(tr._loop)
     782  
     783          err = None
     784          tr._call_connection_lost(err)
     785          self.protocol.connection_lost.assert_called_with(err)
     786          self.pipe.close.assert_called_with()
     787  
     788          self.assertIsNone(tr._protocol)
     789          self.assertIsNone(tr._loop)
     790  
     791      def test__call_connection_lost_with_err(self):
     792          tr = self.read_pipe_transport()
     793          self.assertIsNotNone(tr._protocol)
     794          self.assertIsNotNone(tr._loop)
     795  
     796          err = OSError()
     797          tr._call_connection_lost(err)
     798          self.protocol.connection_lost.assert_called_with(err)
     799          self.pipe.close.assert_called_with()
     800  
     801          self.assertIsNone(tr._protocol)
     802          self.assertIsNone(tr._loop)
     803  
     804      def test_pause_reading_on_closed_pipe(self):
     805          tr = self.read_pipe_transport()
     806          tr.close()
     807          test_utils.run_briefly(self.loop)
     808          self.assertIsNone(tr._loop)
     809          tr.pause_reading()
     810  
     811      def test_pause_reading_on_paused_pipe(self):
     812          tr = self.read_pipe_transport()
     813          tr.pause_reading()
     814          # the second call should do nothing
     815          tr.pause_reading()
     816  
     817      def test_resume_reading_on_closed_pipe(self):
     818          tr = self.read_pipe_transport()
     819          tr.close()
     820          test_utils.run_briefly(self.loop)
     821          self.assertIsNone(tr._loop)
     822          tr.resume_reading()
     823  
     824      def test_resume_reading_on_paused_pipe(self):
     825          tr = self.read_pipe_transport()
     826          # the pipe is not paused
     827          # resuming should do nothing
     828          tr.resume_reading()
     829  
     830  
     831  class ESC[4;38;5;81mUnixWritePipeTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     832  
     833      def setUp(self):
     834          super().setUp()
     835          self.loop = self.new_test_loop()
     836          self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
     837          self.pipe = mock.Mock(spec_set=io.RawIOBase)
     838          self.pipe.fileno.return_value = 5
     839  
     840          blocking_patcher = mock.patch('os.set_blocking')
     841          blocking_patcher.start()
     842          self.addCleanup(blocking_patcher.stop)
     843  
     844          fstat_patcher = mock.patch('os.fstat')
     845          m_fstat = fstat_patcher.start()
     846          st = mock.Mock()
     847          st.st_mode = stat.S_IFSOCK
     848          m_fstat.return_value = st
     849          self.addCleanup(fstat_patcher.stop)
     850  
     851      def write_pipe_transport(self, waiter=None):
     852          transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
     853                                                          self.protocol,
     854                                                          waiter=waiter)
     855          self.addCleanup(close_pipe_transport, transport)
     856          return transport
     857  
     858      def test_ctor(self):
     859          waiter = self.loop.create_future()
     860          tr = self.write_pipe_transport(waiter=waiter)
     861          self.loop.run_until_complete(waiter)
     862  
     863          self.protocol.connection_made.assert_called_with(tr)
     864          self.loop.assert_reader(5, tr._read_ready)
     865          self.assertEqual(None, waiter.result())
     866  
     867      def test_can_write_eof(self):
     868          tr = self.write_pipe_transport()
     869          self.assertTrue(tr.can_write_eof())
     870  
     871      @mock.patch('os.write')
     872      def test_write(self, m_write):
     873          tr = self.write_pipe_transport()
     874          m_write.return_value = 4
     875          tr.write(b'data')
     876          m_write.assert_called_with(5, b'data')
     877          self.assertFalse(self.loop.writers)
     878          self.assertEqual(bytearray(), tr._buffer)
     879  
     880      @mock.patch('os.write')
     881      def test_write_no_data(self, m_write):
     882          tr = self.write_pipe_transport()
     883          tr.write(b'')
     884          self.assertFalse(m_write.called)
     885          self.assertFalse(self.loop.writers)
     886          self.assertEqual(bytearray(b''), tr._buffer)
     887  
     888      @mock.patch('os.write')
     889      def test_write_partial(self, m_write):
     890          tr = self.write_pipe_transport()
     891          m_write.return_value = 2
     892          tr.write(b'data')
     893          self.loop.assert_writer(5, tr._write_ready)
     894          self.assertEqual(bytearray(b'ta'), tr._buffer)
     895  
     896      @mock.patch('os.write')
     897      def test_write_buffer(self, m_write):
     898          tr = self.write_pipe_transport()
     899          self.loop.add_writer(5, tr._write_ready)
     900          tr._buffer = bytearray(b'previous')
     901          tr.write(b'data')
     902          self.assertFalse(m_write.called)
     903          self.loop.assert_writer(5, tr._write_ready)
     904          self.assertEqual(bytearray(b'previousdata'), tr._buffer)
     905  
     906      @mock.patch('os.write')
     907      def test_write_again(self, m_write):
     908          tr = self.write_pipe_transport()
     909          m_write.side_effect = BlockingIOError()
     910          tr.write(b'data')
     911          m_write.assert_called_with(5, bytearray(b'data'))
     912          self.loop.assert_writer(5, tr._write_ready)
     913          self.assertEqual(bytearray(b'data'), tr._buffer)
     914  
     915      @mock.patch('asyncio.unix_events.logger')
     916      @mock.patch('os.write')
     917      def test_write_err(self, m_write, m_log):
     918          tr = self.write_pipe_transport()
     919          err = OSError()
     920          m_write.side_effect = err
     921          tr._fatal_error = mock.Mock()
     922          tr.write(b'data')
     923          m_write.assert_called_with(5, b'data')
     924          self.assertFalse(self.loop.writers)
     925          self.assertEqual(bytearray(), tr._buffer)
     926          tr._fatal_error.assert_called_with(
     927                              err,
     928                              'Fatal write error on pipe transport')
     929          self.assertEqual(1, tr._conn_lost)
     930  
     931          tr.write(b'data')
     932          self.assertEqual(2, tr._conn_lost)
     933          tr.write(b'data')
     934          tr.write(b'data')
     935          tr.write(b'data')
     936          tr.write(b'data')
     937          # This is a bit overspecified. :-(
     938          m_log.warning.assert_called_with(
     939              'pipe closed by peer or os.write(pipe, data) raised exception.')
     940          tr.close()
     941  
     942      @mock.patch('os.write')
     943      def test_write_close(self, m_write):
     944          tr = self.write_pipe_transport()
     945          tr._read_ready()  # pipe was closed by peer
     946  
     947          tr.write(b'data')
     948          self.assertEqual(tr._conn_lost, 1)
     949          tr.write(b'data')
     950          self.assertEqual(tr._conn_lost, 2)
     951  
     952      def test__read_ready(self):
     953          tr = self.write_pipe_transport()
     954          tr._read_ready()
     955          self.assertFalse(self.loop.readers)
     956          self.assertFalse(self.loop.writers)
     957          self.assertTrue(tr.is_closing())
     958          test_utils.run_briefly(self.loop)
     959          self.protocol.connection_lost.assert_called_with(None)
     960  
     961      @mock.patch('os.write')
     962      def test__write_ready(self, m_write):
     963          tr = self.write_pipe_transport()
     964          self.loop.add_writer(5, tr._write_ready)
     965          tr._buffer = bytearray(b'data')
     966          m_write.return_value = 4
     967          tr._write_ready()
     968          self.assertFalse(self.loop.writers)
     969          self.assertEqual(bytearray(), tr._buffer)
     970  
     971      @mock.patch('os.write')
     972      def test__write_ready_partial(self, m_write):
     973          tr = self.write_pipe_transport()
     974          self.loop.add_writer(5, tr._write_ready)
     975          tr._buffer = bytearray(b'data')
     976          m_write.return_value = 3
     977          tr._write_ready()
     978          self.loop.assert_writer(5, tr._write_ready)
     979          self.assertEqual(bytearray(b'a'), tr._buffer)
     980  
     981      @mock.patch('os.write')
     982      def test__write_ready_again(self, m_write):
     983          tr = self.write_pipe_transport()
     984          self.loop.add_writer(5, tr._write_ready)
     985          tr._buffer = bytearray(b'data')
     986          m_write.side_effect = BlockingIOError()
     987          tr._write_ready()
     988          m_write.assert_called_with(5, bytearray(b'data'))
     989          self.loop.assert_writer(5, tr._write_ready)
     990          self.assertEqual(bytearray(b'data'), tr._buffer)
     991  
     992      @mock.patch('os.write')
     993      def test__write_ready_empty(self, m_write):
     994          tr = self.write_pipe_transport()
     995          self.loop.add_writer(5, tr._write_ready)
     996          tr._buffer = bytearray(b'data')
     997          m_write.return_value = 0
     998          tr._write_ready()
     999          m_write.assert_called_with(5, bytearray(b'data'))
    1000          self.loop.assert_writer(5, tr._write_ready)
    1001          self.assertEqual(bytearray(b'data'), tr._buffer)
    1002  
    1003      @mock.patch('asyncio.log.logger.error')
    1004      @mock.patch('os.write')
    1005      def test__write_ready_err(self, m_write, m_logexc):
    1006          tr = self.write_pipe_transport()
    1007          self.loop.add_writer(5, tr._write_ready)
    1008          tr._buffer = bytearray(b'data')
    1009          m_write.side_effect = err = OSError()
    1010          tr._write_ready()
    1011          self.assertFalse(self.loop.writers)
    1012          self.assertFalse(self.loop.readers)
    1013          self.assertEqual(bytearray(), tr._buffer)
    1014          self.assertTrue(tr.is_closing())
    1015          m_logexc.assert_not_called()
    1016          self.assertEqual(1, tr._conn_lost)
    1017          test_utils.run_briefly(self.loop)
    1018          self.protocol.connection_lost.assert_called_with(err)
    1019  
    1020      @mock.patch('os.write')
    1021      def test__write_ready_closing(self, m_write):
    1022          tr = self.write_pipe_transport()
    1023          self.loop.add_writer(5, tr._write_ready)
    1024          tr._closing = True
    1025          tr._buffer = bytearray(b'data')
    1026          m_write.return_value = 4
    1027          tr._write_ready()
    1028          self.assertFalse(self.loop.writers)
    1029          self.assertFalse(self.loop.readers)
    1030          self.assertEqual(bytearray(), tr._buffer)
    1031          self.protocol.connection_lost.assert_called_with(None)
    1032          self.pipe.close.assert_called_with()
    1033  
    1034      @mock.patch('os.write')
    1035      def test_abort(self, m_write):
    1036          tr = self.write_pipe_transport()
    1037          self.loop.add_writer(5, tr._write_ready)
    1038          self.loop.add_reader(5, tr._read_ready)
    1039          tr._buffer = [b'da', b'ta']
    1040          tr.abort()
    1041          self.assertFalse(m_write.called)
    1042          self.assertFalse(self.loop.readers)
    1043          self.assertFalse(self.loop.writers)
    1044          self.assertEqual([], tr._buffer)
    1045          self.assertTrue(tr.is_closing())
    1046          test_utils.run_briefly(self.loop)
    1047          self.protocol.connection_lost.assert_called_with(None)
    1048  
    1049      def test__call_connection_lost(self):
    1050          tr = self.write_pipe_transport()
    1051          self.assertIsNotNone(tr._protocol)
    1052          self.assertIsNotNone(tr._loop)
    1053  
    1054          err = None
    1055          tr._call_connection_lost(err)
    1056          self.protocol.connection_lost.assert_called_with(err)
    1057          self.pipe.close.assert_called_with()
    1058  
    1059          self.assertIsNone(tr._protocol)
    1060          self.assertIsNone(tr._loop)
    1061  
    1062      def test__call_connection_lost_with_err(self):
    1063          tr = self.write_pipe_transport()
    1064          self.assertIsNotNone(tr._protocol)
    1065          self.assertIsNotNone(tr._loop)
    1066  
    1067          err = OSError()
    1068          tr._call_connection_lost(err)
    1069          self.protocol.connection_lost.assert_called_with(err)
    1070          self.pipe.close.assert_called_with()
    1071  
    1072          self.assertIsNone(tr._protocol)
    1073          self.assertIsNone(tr._loop)
    1074  
    1075      def test_close(self):
    1076          tr = self.write_pipe_transport()
    1077          tr.write_eof = mock.Mock()
    1078          tr.close()
    1079          tr.write_eof.assert_called_with()
    1080  
    1081          # closing the transport twice must not fail
    1082          tr.close()
    1083  
    1084      def test_close_closing(self):
    1085          tr = self.write_pipe_transport()
    1086          tr.write_eof = mock.Mock()
    1087          tr._closing = True
    1088          tr.close()
    1089          self.assertFalse(tr.write_eof.called)
    1090  
    1091      def test_write_eof(self):
    1092          tr = self.write_pipe_transport()
    1093          tr.write_eof()
    1094          self.assertTrue(tr.is_closing())
    1095          self.assertFalse(self.loop.readers)
    1096          test_utils.run_briefly(self.loop)
    1097          self.protocol.connection_lost.assert_called_with(None)
    1098  
    1099      def test_write_eof_pending(self):
    1100          tr = self.write_pipe_transport()
    1101          tr._buffer = [b'data']
    1102          tr.write_eof()
    1103          self.assertTrue(tr.is_closing())
    1104          self.assertFalse(self.protocol.connection_lost.called)
    1105  
    1106  
    1107  class ESC[4;38;5;81mAbstractChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1108  
    1109      def test_not_implemented(self):
    1110          f = mock.Mock()
    1111          watcher = asyncio.AbstractChildWatcher()
    1112          self.assertRaises(
    1113              NotImplementedError, watcher.add_child_handler, f, f)
    1114          self.assertRaises(
    1115              NotImplementedError, watcher.remove_child_handler, f)
    1116          self.assertRaises(
    1117              NotImplementedError, watcher.attach_loop, f)
    1118          self.assertRaises(
    1119              NotImplementedError, watcher.close)
    1120          self.assertRaises(
    1121              NotImplementedError, watcher.is_active)
    1122          self.assertRaises(
    1123              NotImplementedError, watcher.__enter__)
    1124          self.assertRaises(
    1125              NotImplementedError, watcher.__exit__, f, f, f)
    1126  
    1127  
    1128  class ESC[4;38;5;81mBaseChildWatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1129  
    1130      def test_not_implemented(self):
    1131          f = mock.Mock()
    1132          watcher = unix_events.BaseChildWatcher()
    1133          self.assertRaises(
    1134              NotImplementedError, watcher._do_waitpid, f)
    1135  
    1136  
    1137  class ESC[4;38;5;81mChildWatcherTestsMixin:
    1138  
    1139      ignore_warnings = mock.patch.object(log.logger, "warning")
    1140  
    1141      def setUp(self):
    1142          super().setUp()
    1143          self.loop = self.new_test_loop()
    1144          self.running = False
    1145          self.zombies = {}
    1146  
    1147          with mock.patch.object(
    1148                  self.loop, "add_signal_handler") as self.m_add_signal_handler:
    1149              self.watcher = self.create_watcher()
    1150              self.watcher.attach_loop(self.loop)
    1151  
    1152      def waitpid(self, pid, flags):
    1153          if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
    1154              self.assertGreater(pid, 0)
    1155          try:
    1156              if pid < 0:
    1157                  return self.zombies.popitem()
    1158              else:
    1159                  return pid, self.zombies.pop(pid)
    1160          except KeyError:
    1161              pass
    1162          if self.running:
    1163              return 0, 0
    1164          else:
    1165              raise ChildProcessError()
    1166  
    1167      def add_zombie(self, pid, status):
    1168          self.zombies[pid] = status
    1169  
    1170      def waitstatus_to_exitcode(self, status):
    1171          if status > 32768:
    1172              return status - 32768
    1173          elif 32700 < status < 32768:
    1174              return status - 32768
    1175          else:
    1176              return status
    1177  
    1178      def test_create_watcher(self):
    1179          self.m_add_signal_handler.assert_called_once_with(
    1180              signal.SIGCHLD, self.watcher._sig_chld)
    1181  
    1182      def waitpid_mocks(func):
    1183          def wrapped_func(self):
    1184              def patch(target, wrapper):
    1185                  return mock.patch(target, wraps=wrapper,
    1186                                    new_callable=mock.Mock)
    1187  
    1188              with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
    1189                   patch('os.waitpid', self.waitpid) as m_waitpid:
    1190                  func(self, m_waitpid)
    1191          return wrapped_func
    1192  
    1193      @waitpid_mocks
    1194      def test_sigchld(self, m_waitpid):
    1195          # register a child
    1196          callback = mock.Mock()
    1197  
    1198          with self.watcher:
    1199              self.running = True
    1200              self.watcher.add_child_handler(42, callback, 9, 10, 14)
    1201  
    1202          self.assertFalse(callback.called)
    1203  
    1204          # child is running
    1205          self.watcher._sig_chld()
    1206  
    1207          self.assertFalse(callback.called)
    1208  
    1209          # child terminates (returncode 12)
    1210          self.running = False
    1211          self.add_zombie(42, EXITCODE(12))
    1212          self.watcher._sig_chld()
    1213  
    1214          callback.assert_called_once_with(42, 12, 9, 10, 14)
    1215  
    1216          callback.reset_mock()
    1217  
    1218          # ensure that the child is effectively reaped
    1219          self.add_zombie(42, EXITCODE(13))
    1220          with self.ignore_warnings:
    1221              self.watcher._sig_chld()
    1222  
    1223          self.assertFalse(callback.called)
    1224  
    1225          # sigchld called again
    1226          self.zombies.clear()
    1227          self.watcher._sig_chld()
    1228  
    1229          self.assertFalse(callback.called)
    1230  
    1231      @waitpid_mocks
    1232      def test_sigchld_two_children(self, m_waitpid):
    1233          callback1 = mock.Mock()
    1234          callback2 = mock.Mock()
    1235  
    1236          # register child 1
    1237          with self.watcher:
    1238              self.running = True
    1239              self.watcher.add_child_handler(43, callback1, 7, 8)
    1240  
    1241          self.assertFalse(callback1.called)
    1242          self.assertFalse(callback2.called)
    1243  
    1244          # register child 2
    1245          with self.watcher:
    1246              self.watcher.add_child_handler(44, callback2, 147, 18)
    1247  
    1248          self.assertFalse(callback1.called)
    1249          self.assertFalse(callback2.called)
    1250  
    1251          # children are running
    1252          self.watcher._sig_chld()
    1253  
    1254          self.assertFalse(callback1.called)
    1255          self.assertFalse(callback2.called)
    1256  
    1257          # child 1 terminates (signal 3)
    1258          self.add_zombie(43, SIGNAL(3))
    1259          self.watcher._sig_chld()
    1260  
    1261          callback1.assert_called_once_with(43, -3, 7, 8)
    1262          self.assertFalse(callback2.called)
    1263  
    1264          callback1.reset_mock()
    1265  
    1266          # child 2 still running
    1267          self.watcher._sig_chld()
    1268  
    1269          self.assertFalse(callback1.called)
    1270          self.assertFalse(callback2.called)
    1271  
    1272          # child 2 terminates (code 108)
    1273          self.add_zombie(44, EXITCODE(108))
    1274          self.running = False
    1275          self.watcher._sig_chld()
    1276  
    1277          callback2.assert_called_once_with(44, 108, 147, 18)
    1278          self.assertFalse(callback1.called)
    1279  
    1280          callback2.reset_mock()
    1281  
    1282          # ensure that the children are effectively reaped
    1283          self.add_zombie(43, EXITCODE(14))
    1284          self.add_zombie(44, EXITCODE(15))
    1285          with self.ignore_warnings:
    1286              self.watcher._sig_chld()
    1287  
    1288          self.assertFalse(callback1.called)
    1289          self.assertFalse(callback2.called)
    1290  
    1291          # sigchld called again
    1292          self.zombies.clear()
    1293          self.watcher._sig_chld()
    1294  
    1295          self.assertFalse(callback1.called)
    1296          self.assertFalse(callback2.called)
    1297  
    1298      @waitpid_mocks
    1299      def test_sigchld_two_children_terminating_together(self, m_waitpid):
    1300          callback1 = mock.Mock()
    1301          callback2 = mock.Mock()
    1302  
    1303          # register child 1
    1304          with self.watcher:
    1305              self.running = True
    1306              self.watcher.add_child_handler(45, callback1, 17, 8)
    1307  
    1308          self.assertFalse(callback1.called)
    1309          self.assertFalse(callback2.called)
    1310  
    1311          # register child 2
    1312          with self.watcher:
    1313              self.watcher.add_child_handler(46, callback2, 1147, 18)
    1314  
    1315          self.assertFalse(callback1.called)
    1316          self.assertFalse(callback2.called)
    1317  
    1318          # children are running
    1319          self.watcher._sig_chld()
    1320  
    1321          self.assertFalse(callback1.called)
    1322          self.assertFalse(callback2.called)
    1323  
    1324          # child 1 terminates (code 78)
    1325          # child 2 terminates (signal 5)
    1326          self.add_zombie(45, EXITCODE(78))
    1327          self.add_zombie(46, SIGNAL(5))
    1328          self.running = False
    1329          self.watcher._sig_chld()
    1330  
    1331          callback1.assert_called_once_with(45, 78, 17, 8)
    1332          callback2.assert_called_once_with(46, -5, 1147, 18)
    1333  
    1334          callback1.reset_mock()
    1335          callback2.reset_mock()
    1336  
    1337          # ensure that the children are effectively reaped
    1338          self.add_zombie(45, EXITCODE(14))
    1339          self.add_zombie(46, EXITCODE(15))
    1340          with self.ignore_warnings:
    1341              self.watcher._sig_chld()
    1342  
    1343          self.assertFalse(callback1.called)
    1344          self.assertFalse(callback2.called)
    1345  
    1346      @waitpid_mocks
    1347      def test_sigchld_race_condition(self, m_waitpid):
    1348          # register a child
    1349          callback = mock.Mock()
    1350  
    1351          with self.watcher:
    1352              # child terminates before being registered
    1353              self.add_zombie(50, EXITCODE(4))
    1354              self.watcher._sig_chld()
    1355  
    1356              self.watcher.add_child_handler(50, callback, 1, 12)
    1357  
    1358          callback.assert_called_once_with(50, 4, 1, 12)
    1359          callback.reset_mock()
    1360  
    1361          # ensure that the child is effectively reaped
    1362          self.add_zombie(50, SIGNAL(1))
    1363          with self.ignore_warnings:
    1364              self.watcher._sig_chld()
    1365  
    1366          self.assertFalse(callback.called)
    1367  
    1368      @waitpid_mocks
    1369      def test_sigchld_replace_handler(self, m_waitpid):
    1370          callback1 = mock.Mock()
    1371          callback2 = mock.Mock()
    1372  
    1373          # register a child
    1374          with self.watcher:
    1375              self.running = True
    1376              self.watcher.add_child_handler(51, callback1, 19)
    1377  
    1378          self.assertFalse(callback1.called)
    1379          self.assertFalse(callback2.called)
    1380  
    1381          # register the same child again
    1382          with self.watcher:
    1383              self.watcher.add_child_handler(51, callback2, 21)
    1384  
    1385          self.assertFalse(callback1.called)
    1386          self.assertFalse(callback2.called)
    1387  
    1388          # child terminates (signal 8)
    1389          self.running = False
    1390          self.add_zombie(51, SIGNAL(8))
    1391          self.watcher._sig_chld()
    1392  
    1393          callback2.assert_called_once_with(51, -8, 21)
    1394          self.assertFalse(callback1.called)
    1395  
    1396          callback2.reset_mock()
    1397  
    1398          # ensure that the child is effectively reaped
    1399          self.add_zombie(51, EXITCODE(13))
    1400          with self.ignore_warnings:
    1401              self.watcher._sig_chld()
    1402  
    1403          self.assertFalse(callback1.called)
    1404          self.assertFalse(callback2.called)
    1405  
    1406      @waitpid_mocks
    1407      def test_sigchld_remove_handler(self, m_waitpid):
    1408          callback = mock.Mock()
    1409  
    1410          # register a child
    1411          with self.watcher:
    1412              self.running = True
    1413              self.watcher.add_child_handler(52, callback, 1984)
    1414  
    1415          self.assertFalse(callback.called)
    1416  
    1417          # unregister the child
    1418          self.watcher.remove_child_handler(52)
    1419  
    1420          self.assertFalse(callback.called)
    1421  
    1422          # child terminates (code 99)
    1423          self.running = False
    1424          self.add_zombie(52, EXITCODE(99))
    1425          with self.ignore_warnings:
    1426              self.watcher._sig_chld()
    1427  
    1428          self.assertFalse(callback.called)
    1429  
    1430      @waitpid_mocks
    1431      def test_sigchld_unknown_status(self, m_waitpid):
    1432          callback = mock.Mock()
    1433  
    1434          # register a child
    1435          with self.watcher:
    1436              self.running = True
    1437              self.watcher.add_child_handler(53, callback, -19)
    1438  
    1439          self.assertFalse(callback.called)
    1440  
    1441          # terminate with unknown status
    1442          self.zombies[53] = 1178
    1443          self.running = False
    1444          self.watcher._sig_chld()
    1445  
    1446          callback.assert_called_once_with(53, 1178, -19)
    1447  
    1448          callback.reset_mock()
    1449  
    1450          # ensure that the child is effectively reaped
    1451          self.add_zombie(53, EXITCODE(101))
    1452          with self.ignore_warnings:
    1453              self.watcher._sig_chld()
    1454  
    1455          self.assertFalse(callback.called)
    1456  
    1457      @waitpid_mocks
    1458      def test_remove_child_handler(self, m_waitpid):
    1459          callback1 = mock.Mock()
    1460          callback2 = mock.Mock()
    1461          callback3 = mock.Mock()
    1462  
    1463          # register children
    1464          with self.watcher:
    1465              self.running = True
    1466              self.watcher.add_child_handler(54, callback1, 1)
    1467              self.watcher.add_child_handler(55, callback2, 2)
    1468              self.watcher.add_child_handler(56, callback3, 3)
    1469  
    1470          # remove child handler 1
    1471          self.assertTrue(self.watcher.remove_child_handler(54))
    1472  
    1473          # remove child handler 2 multiple times
    1474          self.assertTrue(self.watcher.remove_child_handler(55))
    1475          self.assertFalse(self.watcher.remove_child_handler(55))
    1476          self.assertFalse(self.watcher.remove_child_handler(55))
    1477  
    1478          # all children terminate
    1479          self.add_zombie(54, EXITCODE(0))
    1480          self.add_zombie(55, EXITCODE(1))
    1481          self.add_zombie(56, EXITCODE(2))
    1482          self.running = False
    1483          with self.ignore_warnings:
    1484              self.watcher._sig_chld()
    1485  
    1486          self.assertFalse(callback1.called)
    1487          self.assertFalse(callback2.called)
    1488          callback3.assert_called_once_with(56, 2, 3)
    1489  
    1490      @waitpid_mocks
    1491      def test_sigchld_unhandled_exception(self, m_waitpid):
    1492          callback = mock.Mock()
    1493  
    1494          # register a child
    1495          with self.watcher:
    1496              self.running = True
    1497              self.watcher.add_child_handler(57, callback)
    1498  
    1499          # raise an exception
    1500          m_waitpid.side_effect = ValueError
    1501  
    1502          with mock.patch.object(log.logger,
    1503                                 'error') as m_error:
    1504  
    1505              self.assertEqual(self.watcher._sig_chld(), None)
    1506              self.assertTrue(m_error.called)
    1507  
    1508      @waitpid_mocks
    1509      def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
    1510          # register a child
    1511          callback = mock.Mock()
    1512  
    1513          with self.watcher:
    1514              self.running = True
    1515              self.watcher.add_child_handler(58, callback)
    1516  
    1517          self.assertFalse(callback.called)
    1518  
    1519          # child terminates
    1520          self.running = False
    1521          self.add_zombie(58, EXITCODE(4))
    1522  
    1523          # waitpid is called elsewhere
    1524          os.waitpid(58, os.WNOHANG)
    1525  
    1526          m_waitpid.reset_mock()
    1527  
    1528          # sigchld
    1529          with self.ignore_warnings:
    1530              self.watcher._sig_chld()
    1531  
    1532          if isinstance(self.watcher, asyncio.FastChildWatcher):
    1533              # here the FastChildWatcher enters a deadlock
    1534              # (there is no way to prevent it)
    1535              self.assertFalse(callback.called)
    1536          else:
    1537              callback.assert_called_once_with(58, 255)
    1538  
    1539      @waitpid_mocks
    1540      def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
    1541          # register two children
    1542          callback1 = mock.Mock()
    1543          callback2 = mock.Mock()
    1544  
    1545          with self.ignore_warnings, self.watcher:
    1546              self.running = True
    1547              # child 1 terminates
    1548              self.add_zombie(591, EXITCODE(7))
    1549              # an unknown child terminates
    1550              self.add_zombie(593, EXITCODE(17))
    1551  
    1552              self.watcher._sig_chld()
    1553  
    1554              self.watcher.add_child_handler(591, callback1)
    1555              self.watcher.add_child_handler(592, callback2)
    1556  
    1557          callback1.assert_called_once_with(591, 7)
    1558          self.assertFalse(callback2.called)
    1559  
    1560      @waitpid_mocks
    1561      def test_set_loop(self, m_waitpid):
    1562          # register a child
    1563          callback = mock.Mock()
    1564  
    1565          with self.watcher:
    1566              self.running = True
    1567              self.watcher.add_child_handler(60, callback)
    1568  
    1569          # attach a new loop
    1570          old_loop = self.loop
    1571          self.loop = self.new_test_loop()
    1572          patch = mock.patch.object
    1573  
    1574          with patch(old_loop, "remove_signal_handler") as m_old_remove, \
    1575               patch(self.loop, "add_signal_handler") as m_new_add:
    1576  
    1577              self.watcher.attach_loop(self.loop)
    1578  
    1579              m_old_remove.assert_called_once_with(
    1580                  signal.SIGCHLD)
    1581              m_new_add.assert_called_once_with(
    1582                  signal.SIGCHLD, self.watcher._sig_chld)
    1583  
    1584          # child terminates
    1585          self.running = False
    1586          self.add_zombie(60, EXITCODE(9))
    1587          self.watcher._sig_chld()
    1588  
    1589          callback.assert_called_once_with(60, 9)
    1590  
    1591      @waitpid_mocks
    1592      def test_set_loop_race_condition(self, m_waitpid):
    1593          # register 3 children
    1594          callback1 = mock.Mock()
    1595          callback2 = mock.Mock()
    1596          callback3 = mock.Mock()
    1597  
    1598          with self.watcher:
    1599              self.running = True
    1600              self.watcher.add_child_handler(61, callback1)
    1601              self.watcher.add_child_handler(62, callback2)
    1602              self.watcher.add_child_handler(622, callback3)
    1603  
    1604          # detach the loop
    1605          old_loop = self.loop
    1606          self.loop = None
    1607  
    1608          with mock.patch.object(
    1609                  old_loop, "remove_signal_handler") as m_remove_signal_handler:
    1610  
    1611              with self.assertWarnsRegex(
    1612                      RuntimeWarning, 'A loop is being detached'):
    1613                  self.watcher.attach_loop(None)
    1614  
    1615              m_remove_signal_handler.assert_called_once_with(
    1616                  signal.SIGCHLD)
    1617  
    1618          # child 1 & 2 terminate
    1619          self.add_zombie(61, EXITCODE(11))
    1620          self.add_zombie(62, SIGNAL(5))
    1621  
    1622          # SIGCHLD was not caught
    1623          self.assertFalse(callback1.called)
    1624          self.assertFalse(callback2.called)
    1625          self.assertFalse(callback3.called)
    1626  
    1627          # attach a new loop
    1628          self.loop = self.new_test_loop()
    1629  
    1630          with mock.patch.object(
    1631                  self.loop, "add_signal_handler") as m_add_signal_handler:
    1632  
    1633              self.watcher.attach_loop(self.loop)
    1634  
    1635              m_add_signal_handler.assert_called_once_with(
    1636                  signal.SIGCHLD, self.watcher._sig_chld)
    1637              callback1.assert_called_once_with(61, 11)  # race condition!
    1638              callback2.assert_called_once_with(62, -5)  # race condition!
    1639              self.assertFalse(callback3.called)
    1640  
    1641          callback1.reset_mock()
    1642          callback2.reset_mock()
    1643  
    1644          # child 3 terminates
    1645          self.running = False
    1646          self.add_zombie(622, EXITCODE(19))
    1647          self.watcher._sig_chld()
    1648  
    1649          self.assertFalse(callback1.called)
    1650          self.assertFalse(callback2.called)
    1651          callback3.assert_called_once_with(622, 19)
    1652  
    1653      @waitpid_mocks
    1654      def test_close(self, m_waitpid):
    1655          # register two children
    1656          callback1 = mock.Mock()
    1657  
    1658          with self.watcher:
    1659              self.running = True
    1660              # child 1 terminates
    1661              self.add_zombie(63, EXITCODE(9))
    1662              # other child terminates
    1663              self.add_zombie(65, EXITCODE(18))
    1664              self.watcher._sig_chld()
    1665  
    1666              self.watcher.add_child_handler(63, callback1)
    1667              self.watcher.add_child_handler(64, callback1)
    1668  
    1669              self.assertEqual(len(self.watcher._callbacks), 1)
    1670              if isinstance(self.watcher, asyncio.FastChildWatcher):
    1671                  self.assertEqual(len(self.watcher._zombies), 1)
    1672  
    1673              with mock.patch.object(
    1674                      self.loop,
    1675                      "remove_signal_handler") as m_remove_signal_handler:
    1676  
    1677                  self.watcher.close()
    1678  
    1679                  m_remove_signal_handler.assert_called_once_with(
    1680                      signal.SIGCHLD)
    1681                  self.assertFalse(self.watcher._callbacks)
    1682                  if isinstance(self.watcher, asyncio.FastChildWatcher):
    1683                      self.assertFalse(self.watcher._zombies)
    1684  
    1685  
    1686  class ESC[4;38;5;81mSafeChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1687      def create_watcher(self):
    1688          return asyncio.SafeChildWatcher()
    1689  
    1690  
    1691  class ESC[4;38;5;81mFastChildWatcherTests (ESC[4;38;5;149mChildWatcherTestsMixin, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1692      def create_watcher(self):
    1693          return asyncio.FastChildWatcher()
    1694  
    1695  
    1696  class ESC[4;38;5;81mPolicyTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1697  
    1698      def create_policy(self):
    1699          return asyncio.DefaultEventLoopPolicy()
    1700  
    1701      def test_get_default_child_watcher(self):
    1702          policy = self.create_policy()
    1703          self.assertIsNone(policy._watcher)
    1704  
    1705          watcher = policy.get_child_watcher()
    1706          self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
    1707  
    1708          self.assertIs(policy._watcher, watcher)
    1709  
    1710          self.assertIs(watcher, policy.get_child_watcher())
    1711  
    1712      def test_get_child_watcher_after_set(self):
    1713          policy = self.create_policy()
    1714          watcher = asyncio.FastChildWatcher()
    1715  
    1716          policy.set_child_watcher(watcher)
    1717          self.assertIs(policy._watcher, watcher)
    1718          self.assertIs(watcher, policy.get_child_watcher())
    1719  
    1720      def test_get_child_watcher_thread(self):
    1721  
    1722          def f():
    1723              policy.set_event_loop(policy.new_event_loop())
    1724  
    1725              self.assertIsInstance(policy.get_event_loop(),
    1726                                    asyncio.AbstractEventLoop)
    1727              watcher = policy.get_child_watcher()
    1728  
    1729              self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
    1730              self.assertIsNone(watcher._loop)
    1731  
    1732              policy.get_event_loop().close()
    1733  
    1734          policy = self.create_policy()
    1735          policy.set_child_watcher(asyncio.SafeChildWatcher())
    1736  
    1737          th = threading.Thread(target=f)
    1738          th.start()
    1739          th.join()
    1740  
    1741      def test_child_watcher_replace_mainloop_existing(self):
    1742          policy = self.create_policy()
    1743          loop = policy.new_event_loop()
    1744          policy.set_event_loop(loop)
    1745  
    1746          # Explicitly setup SafeChildWatcher,
    1747          # default ThreadedChildWatcher has no _loop property
    1748          watcher = asyncio.SafeChildWatcher()
    1749          policy.set_child_watcher(watcher)
    1750          watcher.attach_loop(loop)
    1751  
    1752          self.assertIs(watcher._loop, loop)
    1753  
    1754          new_loop = policy.new_event_loop()
    1755          policy.set_event_loop(new_loop)
    1756  
    1757          self.assertIs(watcher._loop, new_loop)
    1758  
    1759          policy.set_event_loop(None)
    1760  
    1761          self.assertIs(watcher._loop, None)
    1762  
    1763          loop.close()
    1764          new_loop.close()
    1765  
    1766  
    1767  class ESC[4;38;5;81mTestFunctional(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1768  
    1769      def setUp(self):
    1770          self.loop = asyncio.new_event_loop()
    1771          asyncio.set_event_loop(self.loop)
    1772  
    1773      def tearDown(self):
    1774          self.loop.close()
    1775          asyncio.set_event_loop(None)
    1776  
    1777      def test_add_reader_invalid_argument(self):
    1778          def assert_raises():
    1779              return self.assertRaisesRegex(ValueError, r'Invalid file object')
    1780  
    1781          cb = lambda: None
    1782  
    1783          with assert_raises():
    1784              self.loop.add_reader(object(), cb)
    1785          with assert_raises():
    1786              self.loop.add_writer(object(), cb)
    1787  
    1788          with assert_raises():
    1789              self.loop.remove_reader(object())
    1790          with assert_raises():
    1791              self.loop.remove_writer(object())
    1792  
    1793      def test_add_reader_or_writer_transport_fd(self):
    1794          def assert_raises():
    1795              return self.assertRaisesRegex(
    1796                  RuntimeError,
    1797                  r'File descriptor .* is used by transport')
    1798  
    1799          async def runner():
    1800              tr, pr = await self.loop.create_connection(
    1801                  lambda: asyncio.Protocol(), sock=rsock)
    1802  
    1803              try:
    1804                  cb = lambda: None
    1805  
    1806                  with assert_raises():
    1807                      self.loop.add_reader(rsock, cb)
    1808                  with assert_raises():
    1809                      self.loop.add_reader(rsock.fileno(), cb)
    1810  
    1811                  with assert_raises():
    1812                      self.loop.remove_reader(rsock)
    1813                  with assert_raises():
    1814                      self.loop.remove_reader(rsock.fileno())
    1815  
    1816                  with assert_raises():
    1817                      self.loop.add_writer(rsock, cb)
    1818                  with assert_raises():
    1819                      self.loop.add_writer(rsock.fileno(), cb)
    1820  
    1821                  with assert_raises():
    1822                      self.loop.remove_writer(rsock)
    1823                  with assert_raises():
    1824                      self.loop.remove_writer(rsock.fileno())
    1825  
    1826              finally:
    1827                  tr.close()
    1828  
    1829          rsock, wsock = socket.socketpair()
    1830          try:
    1831              self.loop.run_until_complete(runner())
    1832          finally:
    1833              rsock.close()
    1834              wsock.close()
    1835  
    1836  
    1837  if __name__ == '__main__':
    1838      unittest.main()