python (3.12.0)
1 """Selector event loop for Unix with signal handling."""
2
3 import errno
4 import io
5 import itertools
6 import os
7 import selectors
8 import signal
9 import socket
10 import stat
11 import subprocess
12 import sys
13 import threading
14 import warnings
15
16 from . import base_events
17 from . import base_subprocess
18 from . import constants
19 from . import coroutines
20 from . import events
21 from . import exceptions
22 from . import futures
23 from . import selector_events
24 from . import tasks
25 from . import transports
26 from .log import logger
27
28
29 __all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
32 'FastChildWatcher', 'PidfdChildWatcher',
33 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
35 )
36
37
38 if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
42 def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
47 def waitstatus_to_exitcode(status):
48 try:
49 return os.waitstatus_to_exitcode(status)
50 except ValueError:
51 # The child exited, but we don't understand its status.
52 # This shouldn't happen, but if it does, let's just
53 # return that status; perhaps that helps debug it.
54 return status
55
56
57 class ESC[4;38;5;81m_UnixSelectorEventLoop(ESC[4;38;5;149mselector_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseSelectorEventLoop):
58 """Unix event loop.
59
60 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
61 """
62
63 def __init__(self, selector=None):
64 super().__init__(selector)
65 self._signal_handlers = {}
66
67 def close(self):
68 super().close()
69 if not sys.is_finalizing():
70 for sig in list(self._signal_handlers):
71 self.remove_signal_handler(sig)
72 else:
73 if self._signal_handlers:
74 warnings.warn(f"Closing the loop {self!r} "
75 f"on interpreter shutdown "
76 f"stage, skipping signal handlers removal",
77 ResourceWarning,
78 source=self)
79 self._signal_handlers.clear()
80
81 def _process_self_data(self, data):
82 for signum in data:
83 if not signum:
84 # ignore null bytes written by _write_to_self()
85 continue
86 self._handle_signal(signum)
87
88 def add_signal_handler(self, sig, callback, *args):
89 """Add a handler for a signal. UNIX only.
90
91 Raise ValueError if the signal number is invalid or uncatchable.
92 Raise RuntimeError if there is a problem setting up the handler.
93 """
94 if (coroutines.iscoroutine(callback) or
95 coroutines.iscoroutinefunction(callback)):
96 raise TypeError("coroutines cannot be used "
97 "with add_signal_handler()")
98 self._check_signal(sig)
99 self._check_closed()
100 try:
101 # set_wakeup_fd() raises ValueError if this is not the
102 # main thread. By calling it early we ensure that an
103 # event loop running in another thread cannot add a signal
104 # handler.
105 signal.set_wakeup_fd(self._csock.fileno())
106 except (ValueError, OSError) as exc:
107 raise RuntimeError(str(exc))
108
109 handle = events.Handle(callback, args, self, None)
110 self._signal_handlers[sig] = handle
111
112 try:
113 # Register a dummy signal handler to ask Python to write the signal
114 # number in the wakeup file descriptor. _process_self_data() will
115 # read signal numbers from this file descriptor to handle signals.
116 signal.signal(sig, _sighandler_noop)
117
118 # Set SA_RESTART to limit EINTR occurrences.
119 signal.siginterrupt(sig, False)
120 except OSError as exc:
121 del self._signal_handlers[sig]
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except (ValueError, OSError) as nexc:
126 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
127
128 if exc.errno == errno.EINVAL:
129 raise RuntimeError(f'sig {sig} cannot be caught')
130 else:
131 raise
132
133 def _handle_signal(self, sig):
134 """Internal helper that is the actual signal handler."""
135 handle = self._signal_handlers.get(sig)
136 if handle is None:
137 return # Assume it's some race condition.
138 if handle._cancelled:
139 self.remove_signal_handler(sig) # Remove it properly.
140 else:
141 self._add_callback_signalsafe(handle)
142
143 def remove_signal_handler(self, sig):
144 """Remove a handler for a signal. UNIX only.
145
146 Return True if a signal handler was removed, False if not.
147 """
148 self._check_signal(sig)
149 try:
150 del self._signal_handlers[sig]
151 except KeyError:
152 return False
153
154 if sig == signal.SIGINT:
155 handler = signal.default_int_handler
156 else:
157 handler = signal.SIG_DFL
158
159 try:
160 signal.signal(sig, handler)
161 except OSError as exc:
162 if exc.errno == errno.EINVAL:
163 raise RuntimeError(f'sig {sig} cannot be caught')
164 else:
165 raise
166
167 if not self._signal_handlers:
168 try:
169 signal.set_wakeup_fd(-1)
170 except (ValueError, OSError) as exc:
171 logger.info('set_wakeup_fd(-1) failed: %s', exc)
172
173 return True
174
175 def _check_signal(self, sig):
176 """Internal helper to validate a signal.
177
178 Raise ValueError if the signal number is invalid or uncatchable.
179 Raise RuntimeError if there is a problem setting up the handler.
180 """
181 if not isinstance(sig, int):
182 raise TypeError(f'sig must be an int, not {sig!r}')
183
184 if sig not in signal.valid_signals():
185 raise ValueError(f'invalid signal number {sig}')
186
187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188 extra=None):
189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192 extra=None):
193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
195 async def _make_subprocess_transport(self, protocol, args, shell,
196 stdin, stdout, stderr, bufsize,
197 extra=None, **kwargs):
198 with warnings.catch_warnings():
199 warnings.simplefilter('ignore', DeprecationWarning)
200 watcher = events.get_child_watcher()
201
202 with watcher:
203 if not watcher.is_active():
204 # Check early.
205 # Raising exception before process creation
206 # prevents subprocess execution if the watcher
207 # is not ready to handle it.
208 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
209 "subprocess support is not installed.")
210 waiter = self.create_future()
211 transp = _UnixSubprocessTransport(self, protocol, args, shell,
212 stdin, stdout, stderr, bufsize,
213 waiter=waiter, extra=extra,
214 **kwargs)
215 watcher.add_child_handler(transp.get_pid(),
216 self._child_watcher_callback, transp)
217 try:
218 await waiter
219 except (SystemExit, KeyboardInterrupt):
220 raise
221 except BaseException:
222 transp.close()
223 await transp._wait()
224 raise
225
226 return transp
227
228 def _child_watcher_callback(self, pid, returncode, transp):
229 # Skip one iteration for callbacks to be executed
230 self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
231
232 async def create_unix_connection(
233 self, protocol_factory, path=None, *,
234 ssl=None, sock=None,
235 server_hostname=None,
236 ssl_handshake_timeout=None,
237 ssl_shutdown_timeout=None):
238 assert server_hostname is None or isinstance(server_hostname, str)
239 if ssl:
240 if server_hostname is None:
241 raise ValueError(
242 'you have to pass server_hostname when using ssl')
243 else:
244 if server_hostname is not None:
245 raise ValueError('server_hostname is only meaningful with ssl')
246 if ssl_handshake_timeout is not None:
247 raise ValueError(
248 'ssl_handshake_timeout is only meaningful with ssl')
249 if ssl_shutdown_timeout is not None:
250 raise ValueError(
251 'ssl_shutdown_timeout is only meaningful with ssl')
252
253 if path is not None:
254 if sock is not None:
255 raise ValueError(
256 'path and sock can not be specified at the same time')
257
258 path = os.fspath(path)
259 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
260 try:
261 sock.setblocking(False)
262 await self.sock_connect(sock, path)
263 except:
264 sock.close()
265 raise
266
267 else:
268 if sock is None:
269 raise ValueError('no path and sock were specified')
270 if (sock.family != socket.AF_UNIX or
271 sock.type != socket.SOCK_STREAM):
272 raise ValueError(
273 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
274 sock.setblocking(False)
275
276 transport, protocol = await self._create_connection_transport(
277 sock, protocol_factory, ssl, server_hostname,
278 ssl_handshake_timeout=ssl_handshake_timeout,
279 ssl_shutdown_timeout=ssl_shutdown_timeout)
280 return transport, protocol
281
282 async def create_unix_server(
283 self, protocol_factory, path=None, *,
284 sock=None, backlog=100, ssl=None,
285 ssl_handshake_timeout=None,
286 ssl_shutdown_timeout=None,
287 start_serving=True):
288 if isinstance(ssl, bool):
289 raise TypeError('ssl argument must be an SSLContext or None')
290
291 if ssl_handshake_timeout is not None and not ssl:
292 raise ValueError(
293 'ssl_handshake_timeout is only meaningful with ssl')
294
295 if ssl_shutdown_timeout is not None and not ssl:
296 raise ValueError(
297 'ssl_shutdown_timeout is only meaningful with ssl')
298
299 if path is not None:
300 if sock is not None:
301 raise ValueError(
302 'path and sock can not be specified at the same time')
303
304 path = os.fspath(path)
305 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
306
307 # Check for abstract socket. `str` and `bytes` paths are supported.
308 if path[0] not in (0, '\x00'):
309 try:
310 if stat.S_ISSOCK(os.stat(path).st_mode):
311 os.remove(path)
312 except FileNotFoundError:
313 pass
314 except OSError as err:
315 # Directory may have permissions only to create socket.
316 logger.error('Unable to check or remove stale UNIX socket '
317 '%r: %r', path, err)
318
319 try:
320 sock.bind(path)
321 except OSError as exc:
322 sock.close()
323 if exc.errno == errno.EADDRINUSE:
324 # Let's improve the error message by adding
325 # with what exact address it occurs.
326 msg = f'Address {path!r} is already in use'
327 raise OSError(errno.EADDRINUSE, msg) from None
328 else:
329 raise
330 except:
331 sock.close()
332 raise
333 else:
334 if sock is None:
335 raise ValueError(
336 'path was not specified, and no sock specified')
337
338 if (sock.family != socket.AF_UNIX or
339 sock.type != socket.SOCK_STREAM):
340 raise ValueError(
341 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
342
343 sock.setblocking(False)
344 server = base_events.Server(self, [sock], protocol_factory,
345 ssl, backlog, ssl_handshake_timeout,
346 ssl_shutdown_timeout)
347 if start_serving:
348 server._start_serving()
349 # Skip one loop iteration so that all 'loop.add_reader'
350 # go through.
351 await tasks.sleep(0)
352
353 return server
354
355 async def _sock_sendfile_native(self, sock, file, offset, count):
356 try:
357 os.sendfile
358 except AttributeError:
359 raise exceptions.SendfileNotAvailableError(
360 "os.sendfile() is not available")
361 try:
362 fileno = file.fileno()
363 except (AttributeError, io.UnsupportedOperation) as err:
364 raise exceptions.SendfileNotAvailableError("not a regular file")
365 try:
366 fsize = os.fstat(fileno).st_size
367 except OSError:
368 raise exceptions.SendfileNotAvailableError("not a regular file")
369 blocksize = count if count else fsize
370 if not blocksize:
371 return 0 # empty file
372
373 fut = self.create_future()
374 self._sock_sendfile_native_impl(fut, None, sock, fileno,
375 offset, count, blocksize, 0)
376 return await fut
377
378 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
379 offset, count, blocksize, total_sent):
380 fd = sock.fileno()
381 if registered_fd is not None:
382 # Remove the callback early. It should be rare that the
383 # selector says the fd is ready but the call still returns
384 # EAGAIN, and I am willing to take a hit in that case in
385 # order to simplify the common case.
386 self.remove_writer(registered_fd)
387 if fut.cancelled():
388 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389 return
390 if count:
391 blocksize = count - total_sent
392 if blocksize <= 0:
393 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394 fut.set_result(total_sent)
395 return
396
397 try:
398 sent = os.sendfile(fd, fileno, offset, blocksize)
399 except (BlockingIOError, InterruptedError):
400 if registered_fd is None:
401 self._sock_add_cancellation_callback(fut, sock)
402 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
403 fd, sock, fileno,
404 offset, count, blocksize, total_sent)
405 except OSError as exc:
406 if (registered_fd is not None and
407 exc.errno == errno.ENOTCONN and
408 type(exc) is not ConnectionError):
409 # If we have an ENOTCONN and this isn't a first call to
410 # sendfile(), i.e. the connection was closed in the middle
411 # of the operation, normalize the error to ConnectionError
412 # to make it consistent across all Posix systems.
413 new_exc = ConnectionError(
414 "socket is not connected", errno.ENOTCONN)
415 new_exc.__cause__ = exc
416 exc = new_exc
417 if total_sent == 0:
418 # We can get here for different reasons, the main
419 # one being 'file' is not a regular mmap(2)-like
420 # file, in which case we'll fall back on using
421 # plain send().
422 err = exceptions.SendfileNotAvailableError(
423 "os.sendfile call failed")
424 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
425 fut.set_exception(err)
426 else:
427 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
428 fut.set_exception(exc)
429 except (SystemExit, KeyboardInterrupt):
430 raise
431 except BaseException as exc:
432 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
433 fut.set_exception(exc)
434 else:
435 if sent == 0:
436 # EOF
437 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
438 fut.set_result(total_sent)
439 else:
440 offset += sent
441 total_sent += sent
442 if registered_fd is None:
443 self._sock_add_cancellation_callback(fut, sock)
444 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
445 fd, sock, fileno,
446 offset, count, blocksize, total_sent)
447
448 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
449 if total_sent > 0:
450 os.lseek(fileno, offset, os.SEEK_SET)
451
452 def _sock_add_cancellation_callback(self, fut, sock):
453 def cb(fut):
454 if fut.cancelled():
455 fd = sock.fileno()
456 if fd != -1:
457 self.remove_writer(fd)
458 fut.add_done_callback(cb)
459
460
461 class ESC[4;38;5;81m_UnixReadPipeTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mReadTransport):
462
463 max_size = 256 * 1024 # max bytes we read in one event loop iteration
464
465 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
466 super().__init__(extra)
467 self._extra['pipe'] = pipe
468 self._loop = loop
469 self._pipe = pipe
470 self._fileno = pipe.fileno()
471 self._protocol = protocol
472 self._closing = False
473 self._paused = False
474
475 mode = os.fstat(self._fileno).st_mode
476 if not (stat.S_ISFIFO(mode) or
477 stat.S_ISSOCK(mode) or
478 stat.S_ISCHR(mode)):
479 self._pipe = None
480 self._fileno = None
481 self._protocol = None
482 raise ValueError("Pipe transport is for pipes/sockets only.")
483
484 os.set_blocking(self._fileno, False)
485
486 self._loop.call_soon(self._protocol.connection_made, self)
487 # only start reading when connection_made() has been called
488 self._loop.call_soon(self._add_reader,
489 self._fileno, self._read_ready)
490 if waiter is not None:
491 # only wake up the waiter when connection_made() has been called
492 self._loop.call_soon(futures._set_result_unless_cancelled,
493 waiter, None)
494
495 def _add_reader(self, fd, callback):
496 if not self.is_reading():
497 return
498 self._loop._add_reader(fd, callback)
499
500 def is_reading(self):
501 return not self._paused and not self._closing
502
503 def __repr__(self):
504 info = [self.__class__.__name__]
505 if self._pipe is None:
506 info.append('closed')
507 elif self._closing:
508 info.append('closing')
509 info.append(f'fd={self._fileno}')
510 selector = getattr(self._loop, '_selector', None)
511 if self._pipe is not None and selector is not None:
512 polling = selector_events._test_selector_event(
513 selector, self._fileno, selectors.EVENT_READ)
514 if polling:
515 info.append('polling')
516 else:
517 info.append('idle')
518 elif self._pipe is not None:
519 info.append('open')
520 else:
521 info.append('closed')
522 return '<{}>'.format(' '.join(info))
523
524 def _read_ready(self):
525 try:
526 data = os.read(self._fileno, self.max_size)
527 except (BlockingIOError, InterruptedError):
528 pass
529 except OSError as exc:
530 self._fatal_error(exc, 'Fatal read error on pipe transport')
531 else:
532 if data:
533 self._protocol.data_received(data)
534 else:
535 if self._loop.get_debug():
536 logger.info("%r was closed by peer", self)
537 self._closing = True
538 self._loop._remove_reader(self._fileno)
539 self._loop.call_soon(self._protocol.eof_received)
540 self._loop.call_soon(self._call_connection_lost, None)
541
542 def pause_reading(self):
543 if not self.is_reading():
544 return
545 self._paused = True
546 self._loop._remove_reader(self._fileno)
547 if self._loop.get_debug():
548 logger.debug("%r pauses reading", self)
549
550 def resume_reading(self):
551 if self._closing or not self._paused:
552 return
553 self._paused = False
554 self._loop._add_reader(self._fileno, self._read_ready)
555 if self._loop.get_debug():
556 logger.debug("%r resumes reading", self)
557
558 def set_protocol(self, protocol):
559 self._protocol = protocol
560
561 def get_protocol(self):
562 return self._protocol
563
564 def is_closing(self):
565 return self._closing
566
567 def close(self):
568 if not self._closing:
569 self._close(None)
570
571 def __del__(self, _warn=warnings.warn):
572 if self._pipe is not None:
573 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
574 self._pipe.close()
575
576 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
577 # should be called by exception handler only
578 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
579 if self._loop.get_debug():
580 logger.debug("%r: %s", self, message, exc_info=True)
581 else:
582 self._loop.call_exception_handler({
583 'message': message,
584 'exception': exc,
585 'transport': self,
586 'protocol': self._protocol,
587 })
588 self._close(exc)
589
590 def _close(self, exc):
591 self._closing = True
592 self._loop._remove_reader(self._fileno)
593 self._loop.call_soon(self._call_connection_lost, exc)
594
595 def _call_connection_lost(self, exc):
596 try:
597 self._protocol.connection_lost(exc)
598 finally:
599 self._pipe.close()
600 self._pipe = None
601 self._protocol = None
602 self._loop = None
603
604
605 class ESC[4;38;5;81m_UnixWritePipeTransport(ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149m_FlowControlMixin,
606 ESC[4;38;5;149mtransportsESC[4;38;5;149m.ESC[4;38;5;149mWriteTransport):
607
608 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
609 super().__init__(extra, loop)
610 self._extra['pipe'] = pipe
611 self._pipe = pipe
612 self._fileno = pipe.fileno()
613 self._protocol = protocol
614 self._buffer = bytearray()
615 self._conn_lost = 0
616 self._closing = False # Set when close() or write_eof() called.
617
618 mode = os.fstat(self._fileno).st_mode
619 is_char = stat.S_ISCHR(mode)
620 is_fifo = stat.S_ISFIFO(mode)
621 is_socket = stat.S_ISSOCK(mode)
622 if not (is_char or is_fifo or is_socket):
623 self._pipe = None
624 self._fileno = None
625 self._protocol = None
626 raise ValueError("Pipe transport is only for "
627 "pipes, sockets and character devices")
628
629 os.set_blocking(self._fileno, False)
630 self._loop.call_soon(self._protocol.connection_made, self)
631
632 # On AIX, the reader trick (to be notified when the read end of the
633 # socket is closed) only works for sockets. On other platforms it
634 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
635 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
636 # only start reading when connection_made() has been called
637 self._loop.call_soon(self._loop._add_reader,
638 self._fileno, self._read_ready)
639
640 if waiter is not None:
641 # only wake up the waiter when connection_made() has been called
642 self._loop.call_soon(futures._set_result_unless_cancelled,
643 waiter, None)
644
645 def __repr__(self):
646 info = [self.__class__.__name__]
647 if self._pipe is None:
648 info.append('closed')
649 elif self._closing:
650 info.append('closing')
651 info.append(f'fd={self._fileno}')
652 selector = getattr(self._loop, '_selector', None)
653 if self._pipe is not None and selector is not None:
654 polling = selector_events._test_selector_event(
655 selector, self._fileno, selectors.EVENT_WRITE)
656 if polling:
657 info.append('polling')
658 else:
659 info.append('idle')
660
661 bufsize = self.get_write_buffer_size()
662 info.append(f'bufsize={bufsize}')
663 elif self._pipe is not None:
664 info.append('open')
665 else:
666 info.append('closed')
667 return '<{}>'.format(' '.join(info))
668
669 def get_write_buffer_size(self):
670 return len(self._buffer)
671
672 def _read_ready(self):
673 # Pipe was closed by peer.
674 if self._loop.get_debug():
675 logger.info("%r was closed by peer", self)
676 if self._buffer:
677 self._close(BrokenPipeError())
678 else:
679 self._close()
680
681 def write(self, data):
682 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
683 if isinstance(data, bytearray):
684 data = memoryview(data)
685 if not data:
686 return
687
688 if self._conn_lost or self._closing:
689 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
690 logger.warning('pipe closed by peer or '
691 'os.write(pipe, data) raised exception.')
692 self._conn_lost += 1
693 return
694
695 if not self._buffer:
696 # Attempt to send it right away first.
697 try:
698 n = os.write(self._fileno, data)
699 except (BlockingIOError, InterruptedError):
700 n = 0
701 except (SystemExit, KeyboardInterrupt):
702 raise
703 except BaseException as exc:
704 self._conn_lost += 1
705 self._fatal_error(exc, 'Fatal write error on pipe transport')
706 return
707 if n == len(data):
708 return
709 elif n > 0:
710 data = memoryview(data)[n:]
711 self._loop._add_writer(self._fileno, self._write_ready)
712
713 self._buffer += data
714 self._maybe_pause_protocol()
715
716 def _write_ready(self):
717 assert self._buffer, 'Data should not be empty'
718
719 try:
720 n = os.write(self._fileno, self._buffer)
721 except (BlockingIOError, InterruptedError):
722 pass
723 except (SystemExit, KeyboardInterrupt):
724 raise
725 except BaseException as exc:
726 self._buffer.clear()
727 self._conn_lost += 1
728 # Remove writer here, _fatal_error() doesn't it
729 # because _buffer is empty.
730 self._loop._remove_writer(self._fileno)
731 self._fatal_error(exc, 'Fatal write error on pipe transport')
732 else:
733 if n == len(self._buffer):
734 self._buffer.clear()
735 self._loop._remove_writer(self._fileno)
736 self._maybe_resume_protocol() # May append to buffer.
737 if self._closing:
738 self._loop._remove_reader(self._fileno)
739 self._call_connection_lost(None)
740 return
741 elif n > 0:
742 del self._buffer[:n]
743
744 def can_write_eof(self):
745 return True
746
747 def write_eof(self):
748 if self._closing:
749 return
750 assert self._pipe
751 self._closing = True
752 if not self._buffer:
753 self._loop._remove_reader(self._fileno)
754 self._loop.call_soon(self._call_connection_lost, None)
755
756 def set_protocol(self, protocol):
757 self._protocol = protocol
758
759 def get_protocol(self):
760 return self._protocol
761
762 def is_closing(self):
763 return self._closing
764
765 def close(self):
766 if self._pipe is not None and not self._closing:
767 # write_eof is all what we needed to close the write pipe
768 self.write_eof()
769
770 def __del__(self, _warn=warnings.warn):
771 if self._pipe is not None:
772 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
773 self._pipe.close()
774
775 def abort(self):
776 self._close(None)
777
778 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
779 # should be called by exception handler only
780 if isinstance(exc, OSError):
781 if self._loop.get_debug():
782 logger.debug("%r: %s", self, message, exc_info=True)
783 else:
784 self._loop.call_exception_handler({
785 'message': message,
786 'exception': exc,
787 'transport': self,
788 'protocol': self._protocol,
789 })
790 self._close(exc)
791
792 def _close(self, exc=None):
793 self._closing = True
794 if self._buffer:
795 self._loop._remove_writer(self._fileno)
796 self._buffer.clear()
797 self._loop._remove_reader(self._fileno)
798 self._loop.call_soon(self._call_connection_lost, exc)
799
800 def _call_connection_lost(self, exc):
801 try:
802 self._protocol.connection_lost(exc)
803 finally:
804 self._pipe.close()
805 self._pipe = None
806 self._protocol = None
807 self._loop = None
808
809
810 class ESC[4;38;5;81m_UnixSubprocessTransport(ESC[4;38;5;149mbase_subprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseSubprocessTransport):
811
812 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
813 stdin_w = None
814 if stdin == subprocess.PIPE and sys.platform.startswith('aix'):
815 # Use a socket pair for stdin on AIX, since it does not
816 # support selecting read events on the write end of a
817 # socket (which we use in order to detect closing of the
818 # other end).
819 stdin, stdin_w = socket.socketpair()
820 try:
821 self._proc = subprocess.Popen(
822 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
823 universal_newlines=False, bufsize=bufsize, **kwargs)
824 if stdin_w is not None:
825 stdin.close()
826 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
827 stdin_w = None
828 finally:
829 if stdin_w is not None:
830 stdin.close()
831 stdin_w.close()
832
833
834 class ESC[4;38;5;81mAbstractChildWatcher:
835 """Abstract base class for monitoring child processes.
836
837 Objects derived from this class monitor a collection of subprocesses and
838 report their termination or interruption by a signal.
839
840 New callbacks are registered with .add_child_handler(). Starting a new
841 process must be done within a 'with' block to allow the watcher to suspend
842 its activity until the new process if fully registered (this is needed to
843 prevent a race condition in some implementations).
844
845 Example:
846 with watcher:
847 proc = subprocess.Popen("sleep 1")
848 watcher.add_child_handler(proc.pid, callback)
849
850 Notes:
851 Implementations of this class must be thread-safe.
852
853 Since child watcher objects may catch the SIGCHLD signal and call
854 waitpid(-1), there should be only one active object per process.
855 """
856
857 def __init_subclass__(cls) -> None:
858 if cls.__module__ != __name__:
859 warnings._deprecated("AbstractChildWatcher",
860 "{name!r} is deprecated as of Python 3.12 and will be "
861 "removed in Python {remove}.",
862 remove=(3, 14))
863
864 def add_child_handler(self, pid, callback, *args):
865 """Register a new child handler.
866
867 Arrange for callback(pid, returncode, *args) to be called when
868 process 'pid' terminates. Specifying another callback for the same
869 process replaces the previous handler.
870
871 Note: callback() must be thread-safe.
872 """
873 raise NotImplementedError()
874
875 def remove_child_handler(self, pid):
876 """Removes the handler for process 'pid'.
877
878 The function returns True if the handler was successfully removed,
879 False if there was nothing to remove."""
880
881 raise NotImplementedError()
882
883 def attach_loop(self, loop):
884 """Attach the watcher to an event loop.
885
886 If the watcher was previously attached to an event loop, then it is
887 first detached before attaching to the new loop.
888
889 Note: loop may be None.
890 """
891 raise NotImplementedError()
892
893 def close(self):
894 """Close the watcher.
895
896 This must be called to make sure that any underlying resource is freed.
897 """
898 raise NotImplementedError()
899
900 def is_active(self):
901 """Return ``True`` if the watcher is active and is used by the event loop.
902
903 Return True if the watcher is installed and ready to handle process exit
904 notifications.
905
906 """
907 raise NotImplementedError()
908
909 def __enter__(self):
910 """Enter the watcher's context and allow starting new processes
911
912 This function must return self"""
913 raise NotImplementedError()
914
915 def __exit__(self, a, b, c):
916 """Exit the watcher's context"""
917 raise NotImplementedError()
918
919
920 class ESC[4;38;5;81mPidfdChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
921 """Child watcher implementation using Linux's pid file descriptors.
922
923 This child watcher polls process file descriptors (pidfds) to await child
924 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
925 child watcher implementation. It doesn't require signals or threads, doesn't
926 interfere with any processes launched outside the event loop, and scales
927 linearly with the number of subprocesses launched by the event loop. The
928 main disadvantage is that pidfds are specific to Linux, and only work on
929 recent (5.3+) kernels.
930 """
931
932 def __enter__(self):
933 return self
934
935 def __exit__(self, exc_type, exc_value, exc_traceback):
936 pass
937
938 def is_active(self):
939 return True
940
941 def close(self):
942 pass
943
944 def attach_loop(self, loop):
945 pass
946
947 def add_child_handler(self, pid, callback, *args):
948 loop = events.get_running_loop()
949 pidfd = os.pidfd_open(pid)
950 loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
951
952 def _do_wait(self, pid, pidfd, callback, args):
953 loop = events.get_running_loop()
954 loop._remove_reader(pidfd)
955 try:
956 _, status = os.waitpid(pid, 0)
957 except ChildProcessError:
958 # The child process is already reaped
959 # (may happen if waitpid() is called elsewhere).
960 returncode = 255
961 logger.warning(
962 "child process pid %d exit status already read: "
963 " will report returncode 255",
964 pid)
965 else:
966 returncode = waitstatus_to_exitcode(status)
967
968 os.close(pidfd)
969 callback(pid, returncode, *args)
970
971 def remove_child_handler(self, pid):
972 # asyncio never calls remove_child_handler() !!!
973 # The method is no-op but is implemented because
974 # abstract base classes require it.
975 return True
976
977
978 class ESC[4;38;5;81mBaseChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
979
980 def __init__(self):
981 self._loop = None
982 self._callbacks = {}
983
984 def close(self):
985 self.attach_loop(None)
986
987 def is_active(self):
988 return self._loop is not None and self._loop.is_running()
989
990 def _do_waitpid(self, expected_pid):
991 raise NotImplementedError()
992
993 def _do_waitpid_all(self):
994 raise NotImplementedError()
995
996 def attach_loop(self, loop):
997 assert loop is None or isinstance(loop, events.AbstractEventLoop)
998
999 if self._loop is not None and loop is None and self._callbacks:
1000 warnings.warn(
1001 'A loop is being detached '
1002 'from a child watcher with pending handlers',
1003 RuntimeWarning)
1004
1005 if self._loop is not None:
1006 self._loop.remove_signal_handler(signal.SIGCHLD)
1007
1008 self._loop = loop
1009 if loop is not None:
1010 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1011
1012 # Prevent a race condition in case a child terminated
1013 # during the switch.
1014 self._do_waitpid_all()
1015
1016 def _sig_chld(self):
1017 try:
1018 self._do_waitpid_all()
1019 except (SystemExit, KeyboardInterrupt):
1020 raise
1021 except BaseException as exc:
1022 # self._loop should always be available here
1023 # as '_sig_chld' is added as a signal handler
1024 # in 'attach_loop'
1025 self._loop.call_exception_handler({
1026 'message': 'Unknown exception in SIGCHLD handler',
1027 'exception': exc,
1028 })
1029
1030
1031 class ESC[4;38;5;81mSafeChildWatcher(ESC[4;38;5;149mBaseChildWatcher):
1032 """'Safe' child watcher implementation.
1033
1034 This implementation avoids disrupting other code spawning processes by
1035 polling explicitly each process in the SIGCHLD handler instead of calling
1036 os.waitpid(-1).
1037
1038 This is a safe solution but it has a significant overhead when handling a
1039 big number of children (O(n) each time SIGCHLD is raised)
1040 """
1041
1042 def __init__(self):
1043 super().__init__()
1044 warnings._deprecated("SafeChildWatcher",
1045 "{name!r} is deprecated as of Python 3.12 and will be "
1046 "removed in Python {remove}.",
1047 remove=(3, 14))
1048
1049 def close(self):
1050 self._callbacks.clear()
1051 super().close()
1052
1053 def __enter__(self):
1054 return self
1055
1056 def __exit__(self, a, b, c):
1057 pass
1058
1059 def add_child_handler(self, pid, callback, *args):
1060 self._callbacks[pid] = (callback, args)
1061
1062 # Prevent a race condition in case the child is already terminated.
1063 self._do_waitpid(pid)
1064
1065 def remove_child_handler(self, pid):
1066 try:
1067 del self._callbacks[pid]
1068 return True
1069 except KeyError:
1070 return False
1071
1072 def _do_waitpid_all(self):
1073
1074 for pid in list(self._callbacks):
1075 self._do_waitpid(pid)
1076
1077 def _do_waitpid(self, expected_pid):
1078 assert expected_pid > 0
1079
1080 try:
1081 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1082 except ChildProcessError:
1083 # The child process is already reaped
1084 # (may happen if waitpid() is called elsewhere).
1085 pid = expected_pid
1086 returncode = 255
1087 logger.warning(
1088 "Unknown child process pid %d, will report returncode 255",
1089 pid)
1090 else:
1091 if pid == 0:
1092 # The child process is still alive.
1093 return
1094
1095 returncode = waitstatus_to_exitcode(status)
1096 if self._loop.get_debug():
1097 logger.debug('process %s exited with returncode %s',
1098 expected_pid, returncode)
1099
1100 try:
1101 callback, args = self._callbacks.pop(pid)
1102 except KeyError: # pragma: no cover
1103 # May happen if .remove_child_handler() is called
1104 # after os.waitpid() returns.
1105 if self._loop.get_debug():
1106 logger.warning("Child watcher got an unexpected pid: %r",
1107 pid, exc_info=True)
1108 else:
1109 callback(pid, returncode, *args)
1110
1111
1112 class ESC[4;38;5;81mFastChildWatcher(ESC[4;38;5;149mBaseChildWatcher):
1113 """'Fast' child watcher implementation.
1114
1115 This implementation reaps every terminated processes by calling
1116 os.waitpid(-1) directly, possibly breaking other code spawning processes
1117 and waiting for their termination.
1118
1119 There is no noticeable overhead when handling a big number of children
1120 (O(1) each time a child terminates).
1121 """
1122 def __init__(self):
1123 super().__init__()
1124 self._lock = threading.Lock()
1125 self._zombies = {}
1126 self._forks = 0
1127 warnings._deprecated("FastChildWatcher",
1128 "{name!r} is deprecated as of Python 3.12 and will be "
1129 "removed in Python {remove}.",
1130 remove=(3, 14))
1131
1132 def close(self):
1133 self._callbacks.clear()
1134 self._zombies.clear()
1135 super().close()
1136
1137 def __enter__(self):
1138 with self._lock:
1139 self._forks += 1
1140
1141 return self
1142
1143 def __exit__(self, a, b, c):
1144 with self._lock:
1145 self._forks -= 1
1146
1147 if self._forks or not self._zombies:
1148 return
1149
1150 collateral_victims = str(self._zombies)
1151 self._zombies.clear()
1152
1153 logger.warning(
1154 "Caught subprocesses termination from unknown pids: %s",
1155 collateral_victims)
1156
1157 def add_child_handler(self, pid, callback, *args):
1158 assert self._forks, "Must use the context manager"
1159
1160 with self._lock:
1161 try:
1162 returncode = self._zombies.pop(pid)
1163 except KeyError:
1164 # The child is running.
1165 self._callbacks[pid] = callback, args
1166 return
1167
1168 # The child is dead already. We can fire the callback.
1169 callback(pid, returncode, *args)
1170
1171 def remove_child_handler(self, pid):
1172 try:
1173 del self._callbacks[pid]
1174 return True
1175 except KeyError:
1176 return False
1177
1178 def _do_waitpid_all(self):
1179 # Because of signal coalescing, we must keep calling waitpid() as
1180 # long as we're able to reap a child.
1181 while True:
1182 try:
1183 pid, status = os.waitpid(-1, os.WNOHANG)
1184 except ChildProcessError:
1185 # No more child processes exist.
1186 return
1187 else:
1188 if pid == 0:
1189 # A child process is still alive.
1190 return
1191
1192 returncode = waitstatus_to_exitcode(status)
1193
1194 with self._lock:
1195 try:
1196 callback, args = self._callbacks.pop(pid)
1197 except KeyError:
1198 # unknown child
1199 if self._forks:
1200 # It may not be registered yet.
1201 self._zombies[pid] = returncode
1202 if self._loop.get_debug():
1203 logger.debug('unknown process %s exited '
1204 'with returncode %s',
1205 pid, returncode)
1206 continue
1207 callback = None
1208 else:
1209 if self._loop.get_debug():
1210 logger.debug('process %s exited with returncode %s',
1211 pid, returncode)
1212
1213 if callback is None:
1214 logger.warning(
1215 "Caught subprocess termination from unknown pid: "
1216 "%d -> %d", pid, returncode)
1217 else:
1218 callback(pid, returncode, *args)
1219
1220
1221 class ESC[4;38;5;81mMultiLoopChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
1222 """A watcher that doesn't require running loop in the main thread.
1223
1224 This implementation registers a SIGCHLD signal handler on
1225 instantiation (which may conflict with other code that
1226 install own handler for this signal).
1227
1228 The solution is safe but it has a significant overhead when
1229 handling a big number of processes (*O(n)* each time a
1230 SIGCHLD is received).
1231 """
1232
1233 # Implementation note:
1234 # The class keeps compatibility with AbstractChildWatcher ABC
1235 # To achieve this it has empty attach_loop() method
1236 # and doesn't accept explicit loop argument
1237 # for add_child_handler()/remove_child_handler()
1238 # but retrieves the current loop by get_running_loop()
1239
1240 def __init__(self):
1241 self._callbacks = {}
1242 self._saved_sighandler = None
1243 warnings._deprecated("MultiLoopChildWatcher",
1244 "{name!r} is deprecated as of Python 3.12 and will be "
1245 "removed in Python {remove}.",
1246 remove=(3, 14))
1247
1248 def is_active(self):
1249 return self._saved_sighandler is not None
1250
1251 def close(self):
1252 self._callbacks.clear()
1253 if self._saved_sighandler is None:
1254 return
1255
1256 handler = signal.getsignal(signal.SIGCHLD)
1257 if handler != self._sig_chld:
1258 logger.warning("SIGCHLD handler was changed by outside code")
1259 else:
1260 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1261 self._saved_sighandler = None
1262
1263 def __enter__(self):
1264 return self
1265
1266 def __exit__(self, exc_type, exc_val, exc_tb):
1267 pass
1268
1269 def add_child_handler(self, pid, callback, *args):
1270 loop = events.get_running_loop()
1271 self._callbacks[pid] = (loop, callback, args)
1272
1273 # Prevent a race condition in case the child is already terminated.
1274 self._do_waitpid(pid)
1275
1276 def remove_child_handler(self, pid):
1277 try:
1278 del self._callbacks[pid]
1279 return True
1280 except KeyError:
1281 return False
1282
1283 def attach_loop(self, loop):
1284 # Don't save the loop but initialize itself if called first time
1285 # The reason to do it here is that attach_loop() is called from
1286 # unix policy only for the main thread.
1287 # Main thread is required for subscription on SIGCHLD signal
1288 if self._saved_sighandler is not None:
1289 return
1290
1291 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1292 if self._saved_sighandler is None:
1293 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1294 "restore to default handler on watcher close.")
1295 self._saved_sighandler = signal.SIG_DFL
1296
1297 # Set SA_RESTART to limit EINTR occurrences.
1298 signal.siginterrupt(signal.SIGCHLD, False)
1299
1300 def _do_waitpid_all(self):
1301 for pid in list(self._callbacks):
1302 self._do_waitpid(pid)
1303
1304 def _do_waitpid(self, expected_pid):
1305 assert expected_pid > 0
1306
1307 try:
1308 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1309 except ChildProcessError:
1310 # The child process is already reaped
1311 # (may happen if waitpid() is called elsewhere).
1312 pid = expected_pid
1313 returncode = 255
1314 logger.warning(
1315 "Unknown child process pid %d, will report returncode 255",
1316 pid)
1317 debug_log = False
1318 else:
1319 if pid == 0:
1320 # The child process is still alive.
1321 return
1322
1323 returncode = waitstatus_to_exitcode(status)
1324 debug_log = True
1325 try:
1326 loop, callback, args = self._callbacks.pop(pid)
1327 except KeyError: # pragma: no cover
1328 # May happen if .remove_child_handler() is called
1329 # after os.waitpid() returns.
1330 logger.warning("Child watcher got an unexpected pid: %r",
1331 pid, exc_info=True)
1332 else:
1333 if loop.is_closed():
1334 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1335 else:
1336 if debug_log and loop.get_debug():
1337 logger.debug('process %s exited with returncode %s',
1338 expected_pid, returncode)
1339 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1340
1341 def _sig_chld(self, signum, frame):
1342 try:
1343 self._do_waitpid_all()
1344 except (SystemExit, KeyboardInterrupt):
1345 raise
1346 except BaseException:
1347 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1348
1349
1350 class ESC[4;38;5;81mThreadedChildWatcher(ESC[4;38;5;149mAbstractChildWatcher):
1351 """Threaded child watcher implementation.
1352
1353 The watcher uses a thread per process
1354 for waiting for the process finish.
1355
1356 It doesn't require subscription on POSIX signal
1357 but a thread creation is not free.
1358
1359 The watcher has O(1) complexity, its performance doesn't depend
1360 on amount of spawn processes.
1361 """
1362
1363 def __init__(self):
1364 self._pid_counter = itertools.count(0)
1365 self._threads = {}
1366
1367 def is_active(self):
1368 return True
1369
1370 def close(self):
1371 self._join_threads()
1372
1373 def _join_threads(self):
1374 """Internal: Join all non-daemon threads"""
1375 threads = [thread for thread in list(self._threads.values())
1376 if thread.is_alive() and not thread.daemon]
1377 for thread in threads:
1378 thread.join()
1379
1380 def __enter__(self):
1381 return self
1382
1383 def __exit__(self, exc_type, exc_val, exc_tb):
1384 pass
1385
1386 def __del__(self, _warn=warnings.warn):
1387 threads = [thread for thread in list(self._threads.values())
1388 if thread.is_alive()]
1389 if threads:
1390 _warn(f"{self.__class__} has registered but not finished child processes",
1391 ResourceWarning,
1392 source=self)
1393
1394 def add_child_handler(self, pid, callback, *args):
1395 loop = events.get_running_loop()
1396 thread = threading.Thread(target=self._do_waitpid,
1397 name=f"waitpid-{next(self._pid_counter)}",
1398 args=(loop, pid, callback, args),
1399 daemon=True)
1400 self._threads[pid] = thread
1401 thread.start()
1402
1403 def remove_child_handler(self, pid):
1404 # asyncio never calls remove_child_handler() !!!
1405 # The method is no-op but is implemented because
1406 # abstract base classes require it.
1407 return True
1408
1409 def attach_loop(self, loop):
1410 pass
1411
1412 def _do_waitpid(self, loop, expected_pid, callback, args):
1413 assert expected_pid > 0
1414
1415 try:
1416 pid, status = os.waitpid(expected_pid, 0)
1417 except ChildProcessError:
1418 # The child process is already reaped
1419 # (may happen if waitpid() is called elsewhere).
1420 pid = expected_pid
1421 returncode = 255
1422 logger.warning(
1423 "Unknown child process pid %d, will report returncode 255",
1424 pid)
1425 else:
1426 returncode = waitstatus_to_exitcode(status)
1427 if loop.get_debug():
1428 logger.debug('process %s exited with returncode %s',
1429 expected_pid, returncode)
1430
1431 if loop.is_closed():
1432 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1433 else:
1434 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1435
1436 self._threads.pop(expected_pid)
1437
1438 def can_use_pidfd():
1439 if not hasattr(os, 'pidfd_open'):
1440 return False
1441 try:
1442 pid = os.getpid()
1443 os.close(os.pidfd_open(pid, 0))
1444 except OSError:
1445 # blocked by security policy like SECCOMP
1446 return False
1447 return True
1448
1449
1450 class ESC[4;38;5;81m_UnixDefaultEventLoopPolicy(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mBaseDefaultEventLoopPolicy):
1451 """UNIX event loop policy with a watcher for child processes."""
1452 _loop_factory = _UnixSelectorEventLoop
1453
1454 def __init__(self):
1455 super().__init__()
1456 self._watcher = None
1457
1458 def _init_watcher(self):
1459 with events._lock:
1460 if self._watcher is None: # pragma: no branch
1461 if can_use_pidfd():
1462 self._watcher = PidfdChildWatcher()
1463 else:
1464 self._watcher = ThreadedChildWatcher()
1465 if threading.current_thread() is threading.main_thread():
1466 self._watcher.attach_loop(self._local._loop)
1467
1468 def set_event_loop(self, loop):
1469 """Set the event loop.
1470
1471 As a side effect, if a child watcher was set before, then calling
1472 .set_event_loop() from the main thread will call .attach_loop(loop) on
1473 the child watcher.
1474 """
1475
1476 super().set_event_loop(loop)
1477
1478 if (self._watcher is not None and
1479 threading.current_thread() is threading.main_thread()):
1480 self._watcher.attach_loop(loop)
1481
1482 def get_child_watcher(self):
1483 """Get the watcher for child processes.
1484
1485 If not yet set, a ThreadedChildWatcher object is automatically created.
1486 """
1487 if self._watcher is None:
1488 self._init_watcher()
1489
1490 warnings._deprecated("get_child_watcher",
1491 "{name!r} is deprecated as of Python 3.12 and will be "
1492 "removed in Python {remove}.", remove=(3, 14))
1493 return self._watcher
1494
1495 def set_child_watcher(self, watcher):
1496 """Set the watcher for child processes."""
1497
1498 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1499
1500 if self._watcher is not None:
1501 self._watcher.close()
1502
1503 self._watcher = watcher
1504 warnings._deprecated("set_child_watcher",
1505 "{name!r} is deprecated as of Python 3.12 and will be "
1506 "removed in Python {remove}.", remove=(3, 14))
1507
1508
1509 SelectorEventLoop = _UnixSelectorEventLoop
1510 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy