python (3.11.7)

(root)/
lib/
python3.11/
asyncio/
base_events.py
       1  """Base implementation of event loop.
       2  
       3  The event loop can be broken up into a multiplexer (the part
       4  responsible for notifying us of I/O events) and the event loop proper,
       5  which wraps a multiplexer with functionality for scheduling callbacks,
       6  immediately or at a given time in the future.
       7  
       8  Whenever a public API takes a callback, subsequent positional
       9  arguments will be passed to the callback if/when it is called.  This
      10  avoids the proliferation of trivial lambdas implementing closures.
      11  Keyword arguments for the callback are not supported; this is a
      12  conscious design decision, leaving the door open for keyword arguments
      13  to modify the meaning of the API call itself.
      14  """
      15  
      16  import collections
      17  import collections.abc
      18  import concurrent.futures
      19  import functools
      20  import heapq
      21  import itertools
      22  import os
      23  import socket
      24  import stat
      25  import subprocess
      26  import threading
      27  import time
      28  import traceback
      29  import sys
      30  import warnings
      31  import weakref
      32  
      33  try:
      34      import ssl
      35  except ImportError:  # pragma: no cover
      36      ssl = None
      37  
      38  from . import constants
      39  from . import coroutines
      40  from . import events
      41  from . import exceptions
      42  from . import futures
      43  from . import protocols
      44  from . import sslproto
      45  from . import staggered
      46  from . import tasks
      47  from . import transports
      48  from . import trsock
      49  from .log import logger
      50  
      51  
      52  __all__ = 'BaseEventLoop','Server',
      53  
      54  
      55  # Minimum number of _scheduled timer handles before cleanup of
      56  # cancelled handles is performed.
      57  _MIN_SCHEDULED_TIMER_HANDLES = 100
      58  
      59  # Minimum fraction of _scheduled timer handles that are cancelled
      60  # before cleanup of cancelled handles is performed.
      61  _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
      62  
      63  
      64  _HAS_IPv6 = hasattr(socket, 'AF_INET6')
      65  
      66  # Maximum timeout passed to select to avoid OS limitations
      67  MAXIMUM_SELECT_TIMEOUT = 24 * 3600
      68  
      69  
      70  def _format_handle(handle):
      71      cb = handle._callback
      72      if isinstance(getattr(cb, '__self__', None), tasks.Task):
      73          # format the task
      74          return repr(cb.__self__)
      75      else:
      76          return str(handle)
      77  
      78  
      79  def _format_pipe(fd):
      80      if fd == subprocess.PIPE:
      81          return '<pipe>'
      82      elif fd == subprocess.STDOUT:
      83          return '<stdout>'
      84      else:
      85          return repr(fd)
      86  
      87  
      88  def _set_reuseport(sock):
      89      if not hasattr(socket, 'SO_REUSEPORT'):
      90          raise ValueError('reuse_port not supported by socket module')
      91      else:
      92          try:
      93              sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
      94          except OSError:
      95              raise ValueError('reuse_port not supported by socket module, '
      96                               'SO_REUSEPORT defined but not implemented.')
      97  
      98  
      99  def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
     100      # Try to skip getaddrinfo if "host" is already an IP. Users might have
     101      # handled name resolution in their own code and pass in resolved IPs.
     102      if not hasattr(socket, 'inet_pton'):
     103          return
     104  
     105      if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
     106              host is None:
     107          return None
     108  
     109      if type == socket.SOCK_STREAM:
     110          proto = socket.IPPROTO_TCP
     111      elif type == socket.SOCK_DGRAM:
     112          proto = socket.IPPROTO_UDP
     113      else:
     114          return None
     115  
     116      if port is None:
     117          port = 0
     118      elif isinstance(port, bytes) and port == b'':
     119          port = 0
     120      elif isinstance(port, str) and port == '':
     121          port = 0
     122      else:
     123          # If port's a service name like "http", don't skip getaddrinfo.
     124          try:
     125              port = int(port)
     126          except (TypeError, ValueError):
     127              return None
     128  
     129      if family == socket.AF_UNSPEC:
     130          afs = [socket.AF_INET]
     131          if _HAS_IPv6:
     132              afs.append(socket.AF_INET6)
     133      else:
     134          afs = [family]
     135  
     136      if isinstance(host, bytes):
     137          host = host.decode('idna')
     138      if '%' in host:
     139          # Linux's inet_pton doesn't accept an IPv6 zone index after host,
     140          # like '::1%lo0'.
     141          return None
     142  
     143      for af in afs:
     144          try:
     145              socket.inet_pton(af, host)
     146              # The host has already been resolved.
     147              if _HAS_IPv6 and af == socket.AF_INET6:
     148                  return af, type, proto, '', (host, port, flowinfo, scopeid)
     149              else:
     150                  return af, type, proto, '', (host, port)
     151          except OSError:
     152              pass
     153  
     154      # "host" is not an IP address.
     155      return None
     156  
     157  
     158  def _interleave_addrinfos(addrinfos, first_address_family_count=1):
     159      """Interleave list of addrinfo tuples by family."""
     160      # Group addresses by family
     161      addrinfos_by_family = collections.OrderedDict()
     162      for addr in addrinfos:
     163          family = addr[0]
     164          if family not in addrinfos_by_family:
     165              addrinfos_by_family[family] = []
     166          addrinfos_by_family[family].append(addr)
     167      addrinfos_lists = list(addrinfos_by_family.values())
     168  
     169      reordered = []
     170      if first_address_family_count > 1:
     171          reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
     172          del addrinfos_lists[0][:first_address_family_count - 1]
     173      reordered.extend(
     174          a for a in itertools.chain.from_iterable(
     175              itertools.zip_longest(*addrinfos_lists)
     176          ) if a is not None)
     177      return reordered
     178  
     179  
     180  def _run_until_complete_cb(fut):
     181      if not fut.cancelled():
     182          exc = fut.exception()
     183          if isinstance(exc, (SystemExit, KeyboardInterrupt)):
     184              # Issue #22429: run_forever() already finished, no need to
     185              # stop it.
     186              return
     187      futures._get_loop(fut).stop()
     188  
     189  
     190  if hasattr(socket, 'TCP_NODELAY'):
     191      def _set_nodelay(sock):
     192          if (sock.family in {socket.AF_INET, socket.AF_INET6} and
     193                  sock.type == socket.SOCK_STREAM and
     194                  sock.proto == socket.IPPROTO_TCP):
     195              sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
     196  else:
     197      def _set_nodelay(sock):
     198          pass
     199  
     200  
     201  def _check_ssl_socket(sock):
     202      if ssl is not None and isinstance(sock, ssl.SSLSocket):
     203          raise TypeError("Socket cannot be of type SSLSocket")
     204  
     205  
     206  class ESC[4;38;5;81m_SendfileFallbackProtocol(ESC[4;38;5;149mprotocolsESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
     207      def __init__(self, transp):
     208          if not isinstance(transp, transports._FlowControlMixin):
     209              raise TypeError("transport should be _FlowControlMixin instance")
     210          self._transport = transp
     211          self._proto = transp.get_protocol()
     212          self._should_resume_reading = transp.is_reading()
     213          self._should_resume_writing = transp._protocol_paused
     214          transp.pause_reading()
     215          transp.set_protocol(self)
     216          if self._should_resume_writing:
     217              self._write_ready_fut = self._transport._loop.create_future()
     218          else:
     219              self._write_ready_fut = None
     220  
     221      async def drain(self):
     222          if self._transport.is_closing():
     223              raise ConnectionError("Connection closed by peer")
     224          fut = self._write_ready_fut
     225          if fut is None:
     226              return
     227          await fut
     228  
     229      def connection_made(self, transport):
     230          raise RuntimeError("Invalid state: "
     231                             "connection should have been established already.")
     232  
     233      def connection_lost(self, exc):
     234          if self._write_ready_fut is not None:
     235              # Never happens if peer disconnects after sending the whole content
     236              # Thus disconnection is always an exception from user perspective
     237              if exc is None:
     238                  self._write_ready_fut.set_exception(
     239                      ConnectionError("Connection is closed by peer"))
     240              else:
     241                  self._write_ready_fut.set_exception(exc)
     242          self._proto.connection_lost(exc)
     243  
     244      def pause_writing(self):
     245          if self._write_ready_fut is not None:
     246              return
     247          self._write_ready_fut = self._transport._loop.create_future()
     248  
     249      def resume_writing(self):
     250          if self._write_ready_fut is None:
     251              return
     252          self._write_ready_fut.set_result(False)
     253          self._write_ready_fut = None
     254  
     255      def data_received(self, data):
     256          raise RuntimeError("Invalid state: reading should be paused")
     257  
     258      def eof_received(self):
     259          raise RuntimeError("Invalid state: reading should be paused")
     260  
     261      async def restore(self):
     262          self._transport.set_protocol(self._proto)
     263          if self._should_resume_reading:
     264              self._transport.resume_reading()
     265          if self._write_ready_fut is not None:
     266              # Cancel the future.
     267              # Basically it has no effect because protocol is switched back,
     268              # no code should wait for it anymore.
     269              self._write_ready_fut.cancel()
     270          if self._should_resume_writing:
     271              self._proto.resume_writing()
     272  
     273  
     274  class ESC[4;38;5;81mServer(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mAbstractServer):
     275  
     276      def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
     277                   ssl_handshake_timeout, ssl_shutdown_timeout=None):
     278          self._loop = loop
     279          self._sockets = sockets
     280          self._active_count = 0
     281          self._waiters = []
     282          self._protocol_factory = protocol_factory
     283          self._backlog = backlog
     284          self._ssl_context = ssl_context
     285          self._ssl_handshake_timeout = ssl_handshake_timeout
     286          self._ssl_shutdown_timeout = ssl_shutdown_timeout
     287          self._serving = False
     288          self._serving_forever_fut = None
     289  
     290      def __repr__(self):
     291          return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
     292  
     293      def _attach(self):
     294          assert self._sockets is not None
     295          self._active_count += 1
     296  
     297      def _detach(self):
     298          assert self._active_count > 0
     299          self._active_count -= 1
     300          if self._active_count == 0 and self._sockets is None:
     301              self._wakeup()
     302  
     303      def _wakeup(self):
     304          waiters = self._waiters
     305          self._waiters = None
     306          for waiter in waiters:
     307              if not waiter.done():
     308                  waiter.set_result(waiter)
     309  
     310      def _start_serving(self):
     311          if self._serving:
     312              return
     313          self._serving = True
     314          for sock in self._sockets:
     315              sock.listen(self._backlog)
     316              self._loop._start_serving(
     317                  self._protocol_factory, sock, self._ssl_context,
     318                  self, self._backlog, self._ssl_handshake_timeout,
     319                  self._ssl_shutdown_timeout)
     320  
     321      def get_loop(self):
     322          return self._loop
     323  
     324      def is_serving(self):
     325          return self._serving
     326  
     327      @property
     328      def sockets(self):
     329          if self._sockets is None:
     330              return ()
     331          return tuple(trsock.TransportSocket(s) for s in self._sockets)
     332  
     333      def close(self):
     334          sockets = self._sockets
     335          if sockets is None:
     336              return
     337          self._sockets = None
     338  
     339          for sock in sockets:
     340              self._loop._stop_serving(sock)
     341  
     342          self._serving = False
     343  
     344          if (self._serving_forever_fut is not None and
     345                  not self._serving_forever_fut.done()):
     346              self._serving_forever_fut.cancel()
     347              self._serving_forever_fut = None
     348  
     349          if self._active_count == 0:
     350              self._wakeup()
     351  
     352      async def start_serving(self):
     353          self._start_serving()
     354          # Skip one loop iteration so that all 'loop.add_reader'
     355          # go through.
     356          await tasks.sleep(0)
     357  
     358      async def serve_forever(self):
     359          if self._serving_forever_fut is not None:
     360              raise RuntimeError(
     361                  f'server {self!r} is already being awaited on serve_forever()')
     362          if self._sockets is None:
     363              raise RuntimeError(f'server {self!r} is closed')
     364  
     365          self._start_serving()
     366          self._serving_forever_fut = self._loop.create_future()
     367  
     368          try:
     369              await self._serving_forever_fut
     370          except exceptions.CancelledError:
     371              try:
     372                  self.close()
     373                  await self.wait_closed()
     374              finally:
     375                  raise
     376          finally:
     377              self._serving_forever_fut = None
     378  
     379      async def wait_closed(self):
     380          if self._sockets is None or self._waiters is None:
     381              return
     382          waiter = self._loop.create_future()
     383          self._waiters.append(waiter)
     384          await waiter
     385  
     386  
     387  class ESC[4;38;5;81mBaseEventLoop(ESC[4;38;5;149meventsESC[4;38;5;149m.ESC[4;38;5;149mAbstractEventLoop):
     388  
     389      def __init__(self):
     390          self._timer_cancelled_count = 0
     391          self._closed = False
     392          self._stopping = False
     393          self._ready = collections.deque()
     394          self._scheduled = []
     395          self._default_executor = None
     396          self._internal_fds = 0
     397          # Identifier of the thread running the event loop, or None if the
     398          # event loop is not running
     399          self._thread_id = None
     400          self._clock_resolution = time.get_clock_info('monotonic').resolution
     401          self._exception_handler = None
     402          self.set_debug(coroutines._is_debug_mode())
     403          # In debug mode, if the execution of a callback or a step of a task
     404          # exceed this duration in seconds, the slow callback/task is logged.
     405          self.slow_callback_duration = 0.1
     406          self._current_handle = None
     407          self._task_factory = None
     408          self._coroutine_origin_tracking_enabled = False
     409          self._coroutine_origin_tracking_saved_depth = None
     410  
     411          # A weak set of all asynchronous generators that are
     412          # being iterated by the loop.
     413          self._asyncgens = weakref.WeakSet()
     414          # Set to True when `loop.shutdown_asyncgens` is called.
     415          self._asyncgens_shutdown_called = False
     416          # Set to True when `loop.shutdown_default_executor` is called.
     417          self._executor_shutdown_called = False
     418  
     419      def __repr__(self):
     420          return (
     421              f'<{self.__class__.__name__} running={self.is_running()} '
     422              f'closed={self.is_closed()} debug={self.get_debug()}>'
     423          )
     424  
     425      def create_future(self):
     426          """Create a Future object attached to the loop."""
     427          return futures.Future(loop=self)
     428  
     429      def create_task(self, coro, *, name=None, context=None):
     430          """Schedule a coroutine object.
     431  
     432          Return a task object.
     433          """
     434          self._check_closed()
     435          if self._task_factory is None:
     436              task = tasks.Task(coro, loop=self, name=name, context=context)
     437              if task._source_traceback:
     438                  del task._source_traceback[-1]
     439          else:
     440              if context is None:
     441                  # Use legacy API if context is not needed
     442                  task = self._task_factory(self, coro)
     443              else:
     444                  task = self._task_factory(self, coro, context=context)
     445  
     446              tasks._set_task_name(task, name)
     447  
     448          return task
     449  
     450      def set_task_factory(self, factory):
     451          """Set a task factory that will be used by loop.create_task().
     452  
     453          If factory is None the default task factory will be set.
     454  
     455          If factory is a callable, it should have a signature matching
     456          '(loop, coro)', where 'loop' will be a reference to the active
     457          event loop, 'coro' will be a coroutine object.  The callable
     458          must return a Future.
     459          """
     460          if factory is not None and not callable(factory):
     461              raise TypeError('task factory must be a callable or None')
     462          self._task_factory = factory
     463  
     464      def get_task_factory(self):
     465          """Return a task factory, or None if the default one is in use."""
     466          return self._task_factory
     467  
     468      def _make_socket_transport(self, sock, protocol, waiter=None, *,
     469                                 extra=None, server=None):
     470          """Create socket transport."""
     471          raise NotImplementedError
     472  
     473      def _make_ssl_transport(
     474              self, rawsock, protocol, sslcontext, waiter=None,
     475              *, server_side=False, server_hostname=None,
     476              extra=None, server=None,
     477              ssl_handshake_timeout=None,
     478              ssl_shutdown_timeout=None,
     479              call_connection_made=True):
     480          """Create SSL transport."""
     481          raise NotImplementedError
     482  
     483      def _make_datagram_transport(self, sock, protocol,
     484                                   address=None, waiter=None, extra=None):
     485          """Create datagram transport."""
     486          raise NotImplementedError
     487  
     488      def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
     489                                    extra=None):
     490          """Create read pipe transport."""
     491          raise NotImplementedError
     492  
     493      def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
     494                                     extra=None):
     495          """Create write pipe transport."""
     496          raise NotImplementedError
     497  
     498      async def _make_subprocess_transport(self, protocol, args, shell,
     499                                           stdin, stdout, stderr, bufsize,
     500                                           extra=None, **kwargs):
     501          """Create subprocess transport."""
     502          raise NotImplementedError
     503  
     504      def _write_to_self(self):
     505          """Write a byte to self-pipe, to wake up the event loop.
     506  
     507          This may be called from a different thread.
     508  
     509          The subclass is responsible for implementing the self-pipe.
     510          """
     511          raise NotImplementedError
     512  
     513      def _process_events(self, event_list):
     514          """Process selector events."""
     515          raise NotImplementedError
     516  
     517      def _check_closed(self):
     518          if self._closed:
     519              raise RuntimeError('Event loop is closed')
     520  
     521      def _check_default_executor(self):
     522          if self._executor_shutdown_called:
     523              raise RuntimeError('Executor shutdown has been called')
     524  
     525      def _asyncgen_finalizer_hook(self, agen):
     526          self._asyncgens.discard(agen)
     527          if not self.is_closed():
     528              self.call_soon_threadsafe(self.create_task, agen.aclose())
     529  
     530      def _asyncgen_firstiter_hook(self, agen):
     531          if self._asyncgens_shutdown_called:
     532              warnings.warn(
     533                  f"asynchronous generator {agen!r} was scheduled after "
     534                  f"loop.shutdown_asyncgens() call",
     535                  ResourceWarning, source=self)
     536  
     537          self._asyncgens.add(agen)
     538  
     539      async def shutdown_asyncgens(self):
     540          """Shutdown all active asynchronous generators."""
     541          self._asyncgens_shutdown_called = True
     542  
     543          if not len(self._asyncgens):
     544              # If Python version is <3.6 or we don't have any asynchronous
     545              # generators alive.
     546              return
     547  
     548          closing_agens = list(self._asyncgens)
     549          self._asyncgens.clear()
     550  
     551          results = await tasks.gather(
     552              *[ag.aclose() for ag in closing_agens],
     553              return_exceptions=True)
     554  
     555          for result, agen in zip(results, closing_agens):
     556              if isinstance(result, Exception):
     557                  self.call_exception_handler({
     558                      'message': f'an error occurred during closing of '
     559                                 f'asynchronous generator {agen!r}',
     560                      'exception': result,
     561                      'asyncgen': agen
     562                  })
     563  
     564      async def shutdown_default_executor(self):
     565          """Schedule the shutdown of the default executor."""
     566          self._executor_shutdown_called = True
     567          if self._default_executor is None:
     568              return
     569          future = self.create_future()
     570          thread = threading.Thread(target=self._do_shutdown, args=(future,))
     571          thread.start()
     572          try:
     573              await future
     574          finally:
     575              thread.join()
     576  
     577      def _do_shutdown(self, future):
     578          try:
     579              self._default_executor.shutdown(wait=True)
     580              if not self.is_closed():
     581                  self.call_soon_threadsafe(future.set_result, None)
     582          except Exception as ex:
     583              if not self.is_closed():
     584                  self.call_soon_threadsafe(future.set_exception, ex)
     585  
     586      def _check_running(self):
     587          if self.is_running():
     588              raise RuntimeError('This event loop is already running')
     589          if events._get_running_loop() is not None:
     590              raise RuntimeError(
     591                  'Cannot run the event loop while another loop is running')
     592  
     593      def run_forever(self):
     594          """Run until stop() is called."""
     595          self._check_closed()
     596          self._check_running()
     597          self._set_coroutine_origin_tracking(self._debug)
     598  
     599          old_agen_hooks = sys.get_asyncgen_hooks()
     600          try:
     601              self._thread_id = threading.get_ident()
     602              sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
     603                                     finalizer=self._asyncgen_finalizer_hook)
     604  
     605              events._set_running_loop(self)
     606              while True:
     607                  self._run_once()
     608                  if self._stopping:
     609                      break
     610          finally:
     611              self._stopping = False
     612              self._thread_id = None
     613              events._set_running_loop(None)
     614              self._set_coroutine_origin_tracking(False)
     615              sys.set_asyncgen_hooks(*old_agen_hooks)
     616  
     617      def run_until_complete(self, future):
     618          """Run until the Future is done.
     619  
     620          If the argument is a coroutine, it is wrapped in a Task.
     621  
     622          WARNING: It would be disastrous to call run_until_complete()
     623          with the same coroutine twice -- it would wrap it in two
     624          different Tasks and that can't be good.
     625  
     626          Return the Future's result, or raise its exception.
     627          """
     628          self._check_closed()
     629          self._check_running()
     630  
     631          new_task = not futures.isfuture(future)
     632          future = tasks.ensure_future(future, loop=self)
     633          if new_task:
     634              # An exception is raised if the future didn't complete, so there
     635              # is no need to log the "destroy pending task" message
     636              future._log_destroy_pending = False
     637  
     638          future.add_done_callback(_run_until_complete_cb)
     639          try:
     640              self.run_forever()
     641          except:
     642              if new_task and future.done() and not future.cancelled():
     643                  # The coroutine raised a BaseException. Consume the exception
     644                  # to not log a warning, the caller doesn't have access to the
     645                  # local task.
     646                  future.exception()
     647              raise
     648          finally:
     649              future.remove_done_callback(_run_until_complete_cb)
     650          if not future.done():
     651              raise RuntimeError('Event loop stopped before Future completed.')
     652  
     653          return future.result()
     654  
     655      def stop(self):
     656          """Stop running the event loop.
     657  
     658          Every callback already scheduled will still run.  This simply informs
     659          run_forever to stop looping after a complete iteration.
     660          """
     661          self._stopping = True
     662  
     663      def close(self):
     664          """Close the event loop.
     665  
     666          This clears the queues and shuts down the executor,
     667          but does not wait for the executor to finish.
     668  
     669          The event loop must not be running.
     670          """
     671          if self.is_running():
     672              raise RuntimeError("Cannot close a running event loop")
     673          if self._closed:
     674              return
     675          if self._debug:
     676              logger.debug("Close %r", self)
     677          self._closed = True
     678          self._ready.clear()
     679          self._scheduled.clear()
     680          self._executor_shutdown_called = True
     681          executor = self._default_executor
     682          if executor is not None:
     683              self._default_executor = None
     684              executor.shutdown(wait=False)
     685  
     686      def is_closed(self):
     687          """Returns True if the event loop was closed."""
     688          return self._closed
     689  
     690      def __del__(self, _warn=warnings.warn):
     691          if not self.is_closed():
     692              _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
     693              if not self.is_running():
     694                  self.close()
     695  
     696      def is_running(self):
     697          """Returns True if the event loop is running."""
     698          return (self._thread_id is not None)
     699  
     700      def time(self):
     701          """Return the time according to the event loop's clock.
     702  
     703          This is a float expressed in seconds since an epoch, but the
     704          epoch, precision, accuracy and drift are unspecified and may
     705          differ per event loop.
     706          """
     707          return time.monotonic()
     708  
     709      def call_later(self, delay, callback, *args, context=None):
     710          """Arrange for a callback to be called at a given time.
     711  
     712          Return a Handle: an opaque object with a cancel() method that
     713          can be used to cancel the call.
     714  
     715          The delay can be an int or float, expressed in seconds.  It is
     716          always relative to the current time.
     717  
     718          Each callback will be called exactly once.  If two callbacks
     719          are scheduled for exactly the same time, it is undefined which
     720          will be called first.
     721  
     722          Any positional arguments after the callback will be passed to
     723          the callback when it is called.
     724          """
     725          if delay is None:
     726              raise TypeError('delay must not be None')
     727          timer = self.call_at(self.time() + delay, callback, *args,
     728                               context=context)
     729          if timer._source_traceback:
     730              del timer._source_traceback[-1]
     731          return timer
     732  
     733      def call_at(self, when, callback, *args, context=None):
     734          """Like call_later(), but uses an absolute time.
     735  
     736          Absolute time corresponds to the event loop's time() method.
     737          """
     738          if when is None:
     739              raise TypeError("when cannot be None")
     740          self._check_closed()
     741          if self._debug:
     742              self._check_thread()
     743              self._check_callback(callback, 'call_at')
     744          timer = events.TimerHandle(when, callback, args, self, context)
     745          if timer._source_traceback:
     746              del timer._source_traceback[-1]
     747          heapq.heappush(self._scheduled, timer)
     748          timer._scheduled = True
     749          return timer
     750  
     751      def call_soon(self, callback, *args, context=None):
     752          """Arrange for a callback to be called as soon as possible.
     753  
     754          This operates as a FIFO queue: callbacks are called in the
     755          order in which they are registered.  Each callback will be
     756          called exactly once.
     757  
     758          Any positional arguments after the callback will be passed to
     759          the callback when it is called.
     760          """
     761          self._check_closed()
     762          if self._debug:
     763              self._check_thread()
     764              self._check_callback(callback, 'call_soon')
     765          handle = self._call_soon(callback, args, context)
     766          if handle._source_traceback:
     767              del handle._source_traceback[-1]
     768          return handle
     769  
     770      def _check_callback(self, callback, method):
     771          if (coroutines.iscoroutine(callback) or
     772                  coroutines.iscoroutinefunction(callback)):
     773              raise TypeError(
     774                  f"coroutines cannot be used with {method}()")
     775          if not callable(callback):
     776              raise TypeError(
     777                  f'a callable object was expected by {method}(), '
     778                  f'got {callback!r}')
     779  
     780      def _call_soon(self, callback, args, context):
     781          handle = events.Handle(callback, args, self, context)
     782          if handle._source_traceback:
     783              del handle._source_traceback[-1]
     784          self._ready.append(handle)
     785          return handle
     786  
     787      def _check_thread(self):
     788          """Check that the current thread is the thread running the event loop.
     789  
     790          Non-thread-safe methods of this class make this assumption and will
     791          likely behave incorrectly when the assumption is violated.
     792  
     793          Should only be called when (self._debug == True).  The caller is
     794          responsible for checking this condition for performance reasons.
     795          """
     796          if self._thread_id is None:
     797              return
     798          thread_id = threading.get_ident()
     799          if thread_id != self._thread_id:
     800              raise RuntimeError(
     801                  "Non-thread-safe operation invoked on an event loop other "
     802                  "than the current one")
     803  
     804      def call_soon_threadsafe(self, callback, *args, context=None):
     805          """Like call_soon(), but thread-safe."""
     806          self._check_closed()
     807          if self._debug:
     808              self._check_callback(callback, 'call_soon_threadsafe')
     809          handle = self._call_soon(callback, args, context)
     810          if handle._source_traceback:
     811              del handle._source_traceback[-1]
     812          self._write_to_self()
     813          return handle
     814  
     815      def run_in_executor(self, executor, func, *args):
     816          self._check_closed()
     817          if self._debug:
     818              self._check_callback(func, 'run_in_executor')
     819          if executor is None:
     820              executor = self._default_executor
     821              # Only check when the default executor is being used
     822              self._check_default_executor()
     823              if executor is None:
     824                  executor = concurrent.futures.ThreadPoolExecutor(
     825                      thread_name_prefix='asyncio'
     826                  )
     827                  self._default_executor = executor
     828          return futures.wrap_future(
     829              executor.submit(func, *args), loop=self)
     830  
     831      def set_default_executor(self, executor):
     832          if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
     833              raise TypeError('executor must be ThreadPoolExecutor instance')
     834          self._default_executor = executor
     835  
     836      def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
     837          msg = [f"{host}:{port!r}"]
     838          if family:
     839              msg.append(f'family={family!r}')
     840          if type:
     841              msg.append(f'type={type!r}')
     842          if proto:
     843              msg.append(f'proto={proto!r}')
     844          if flags:
     845              msg.append(f'flags={flags!r}')
     846          msg = ', '.join(msg)
     847          logger.debug('Get address info %s', msg)
     848  
     849          t0 = self.time()
     850          addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
     851          dt = self.time() - t0
     852  
     853          msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
     854          if dt >= self.slow_callback_duration:
     855              logger.info(msg)
     856          else:
     857              logger.debug(msg)
     858          return addrinfo
     859  
     860      async def getaddrinfo(self, host, port, *,
     861                            family=0, type=0, proto=0, flags=0):
     862          if self._debug:
     863              getaddr_func = self._getaddrinfo_debug
     864          else:
     865              getaddr_func = socket.getaddrinfo
     866  
     867          return await self.run_in_executor(
     868              None, getaddr_func, host, port, family, type, proto, flags)
     869  
     870      async def getnameinfo(self, sockaddr, flags=0):
     871          return await self.run_in_executor(
     872              None, socket.getnameinfo, sockaddr, flags)
     873  
     874      async def sock_sendfile(self, sock, file, offset=0, count=None,
     875                              *, fallback=True):
     876          if self._debug and sock.gettimeout() != 0:
     877              raise ValueError("the socket must be non-blocking")
     878          _check_ssl_socket(sock)
     879          self._check_sendfile_params(sock, file, offset, count)
     880          try:
     881              return await self._sock_sendfile_native(sock, file,
     882                                                      offset, count)
     883          except exceptions.SendfileNotAvailableError as exc:
     884              if not fallback:
     885                  raise
     886          return await self._sock_sendfile_fallback(sock, file,
     887                                                    offset, count)
     888  
     889      async def _sock_sendfile_native(self, sock, file, offset, count):
     890          # NB: sendfile syscall is not supported for SSL sockets and
     891          # non-mmap files even if sendfile is supported by OS
     892          raise exceptions.SendfileNotAvailableError(
     893              f"syscall sendfile is not available for socket {sock!r} "
     894              f"and file {file!r} combination")
     895  
     896      async def _sock_sendfile_fallback(self, sock, file, offset, count):
     897          if offset:
     898              file.seek(offset)
     899          blocksize = (
     900              min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
     901              if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
     902          )
     903          buf = bytearray(blocksize)
     904          total_sent = 0
     905          try:
     906              while True:
     907                  if count:
     908                      blocksize = min(count - total_sent, blocksize)
     909                      if blocksize <= 0:
     910                          break
     911                  view = memoryview(buf)[:blocksize]
     912                  read = await self.run_in_executor(None, file.readinto, view)
     913                  if not read:
     914                      break  # EOF
     915                  await self.sock_sendall(sock, view[:read])
     916                  total_sent += read
     917              return total_sent
     918          finally:
     919              if total_sent > 0 and hasattr(file, 'seek'):
     920                  file.seek(offset + total_sent)
     921  
     922      def _check_sendfile_params(self, sock, file, offset, count):
     923          if 'b' not in getattr(file, 'mode', 'b'):
     924              raise ValueError("file should be opened in binary mode")
     925          if not sock.type == socket.SOCK_STREAM:
     926              raise ValueError("only SOCK_STREAM type sockets are supported")
     927          if count is not None:
     928              if not isinstance(count, int):
     929                  raise TypeError(
     930                      "count must be a positive integer (got {!r})".format(count))
     931              if count <= 0:
     932                  raise ValueError(
     933                      "count must be a positive integer (got {!r})".format(count))
     934          if not isinstance(offset, int):
     935              raise TypeError(
     936                  "offset must be a non-negative integer (got {!r})".format(
     937                      offset))
     938          if offset < 0:
     939              raise ValueError(
     940                  "offset must be a non-negative integer (got {!r})".format(
     941                      offset))
     942  
     943      async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
     944          """Create, bind and connect one socket."""
     945          my_exceptions = []
     946          exceptions.append(my_exceptions)
     947          family, type_, proto, _, address = addr_info
     948          sock = None
     949          try:
     950              sock = socket.socket(family=family, type=type_, proto=proto)
     951              sock.setblocking(False)
     952              if local_addr_infos is not None:
     953                  for lfamily, _, _, _, laddr in local_addr_infos:
     954                      # skip local addresses of different family
     955                      if lfamily != family:
     956                          continue
     957                      try:
     958                          sock.bind(laddr)
     959                          break
     960                      except OSError as exc:
     961                          msg = (
     962                              f'error while attempting to bind on '
     963                              f'address {laddr!r}: '
     964                              f'{exc.strerror.lower()}'
     965                          )
     966                          exc = OSError(exc.errno, msg)
     967                          my_exceptions.append(exc)
     968                  else:  # all bind attempts failed
     969                      if my_exceptions:
     970                          raise my_exceptions.pop()
     971                      else:
     972                          raise OSError(f"no matching local address with {family=} found")
     973              await self.sock_connect(sock, address)
     974              return sock
     975          except OSError as exc:
     976              my_exceptions.append(exc)
     977              if sock is not None:
     978                  sock.close()
     979              raise
     980          except:
     981              if sock is not None:
     982                  sock.close()
     983              raise
     984          finally:
     985              exceptions = my_exceptions = None
     986  
     987      async def create_connection(
     988              self, protocol_factory, host=None, port=None,
     989              *, ssl=None, family=0,
     990              proto=0, flags=0, sock=None,
     991              local_addr=None, server_hostname=None,
     992              ssl_handshake_timeout=None,
     993              ssl_shutdown_timeout=None,
     994              happy_eyeballs_delay=None, interleave=None):
     995          """Connect to a TCP server.
     996  
     997          Create a streaming transport connection to a given internet host and
     998          port: socket family AF_INET or socket.AF_INET6 depending on host (or
     999          family if specified), socket type SOCK_STREAM. protocol_factory must be
    1000          a callable returning a protocol instance.
    1001  
    1002          This method is a coroutine which will try to establish the connection
    1003          in the background.  When successful, the coroutine returns a
    1004          (transport, protocol) pair.
    1005          """
    1006          if server_hostname is not None and not ssl:
    1007              raise ValueError('server_hostname is only meaningful with ssl')
    1008  
    1009          if server_hostname is None and ssl:
    1010              # Use host as default for server_hostname.  It is an error
    1011              # if host is empty or not set, e.g. when an
    1012              # already-connected socket was passed or when only a port
    1013              # is given.  To avoid this error, you can pass
    1014              # server_hostname='' -- this will bypass the hostname
    1015              # check.  (This also means that if host is a numeric
    1016              # IP/IPv6 address, we will attempt to verify that exact
    1017              # address; this will probably fail, but it is possible to
    1018              # create a certificate for a specific IP address, so we
    1019              # don't judge it here.)
    1020              if not host:
    1021                  raise ValueError('You must set server_hostname '
    1022                                   'when using ssl without a host')
    1023              server_hostname = host
    1024  
    1025          if ssl_handshake_timeout is not None and not ssl:
    1026              raise ValueError(
    1027                  'ssl_handshake_timeout is only meaningful with ssl')
    1028  
    1029          if ssl_shutdown_timeout is not None and not ssl:
    1030              raise ValueError(
    1031                  'ssl_shutdown_timeout is only meaningful with ssl')
    1032  
    1033          if sock is not None:
    1034              _check_ssl_socket(sock)
    1035  
    1036          if happy_eyeballs_delay is not None and interleave is None:
    1037              # If using happy eyeballs, default to interleave addresses by family
    1038              interleave = 1
    1039  
    1040          if host is not None or port is not None:
    1041              if sock is not None:
    1042                  raise ValueError(
    1043                      'host/port and sock can not be specified at the same time')
    1044  
    1045              infos = await self._ensure_resolved(
    1046                  (host, port), family=family,
    1047                  type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
    1048              if not infos:
    1049                  raise OSError('getaddrinfo() returned empty list')
    1050  
    1051              if local_addr is not None:
    1052                  laddr_infos = await self._ensure_resolved(
    1053                      local_addr, family=family,
    1054                      type=socket.SOCK_STREAM, proto=proto,
    1055                      flags=flags, loop=self)
    1056                  if not laddr_infos:
    1057                      raise OSError('getaddrinfo() returned empty list')
    1058              else:
    1059                  laddr_infos = None
    1060  
    1061              if interleave:
    1062                  infos = _interleave_addrinfos(infos, interleave)
    1063  
    1064              exceptions = []
    1065              if happy_eyeballs_delay is None:
    1066                  # not using happy eyeballs
    1067                  for addrinfo in infos:
    1068                      try:
    1069                          sock = await self._connect_sock(
    1070                              exceptions, addrinfo, laddr_infos)
    1071                          break
    1072                      except OSError:
    1073                          continue
    1074              else:  # using happy eyeballs
    1075                  sock, _, _ = await staggered.staggered_race(
    1076                      (functools.partial(self._connect_sock,
    1077                                         exceptions, addrinfo, laddr_infos)
    1078                       for addrinfo in infos),
    1079                      happy_eyeballs_delay, loop=self)
    1080  
    1081              if sock is None:
    1082                  exceptions = [exc for sub in exceptions for exc in sub]
    1083                  try:
    1084                      if len(exceptions) == 1:
    1085                          raise exceptions[0]
    1086                      else:
    1087                          # If they all have the same str(), raise one.
    1088                          model = str(exceptions[0])
    1089                          if all(str(exc) == model for exc in exceptions):
    1090                              raise exceptions[0]
    1091                          # Raise a combined exception so the user can see all
    1092                          # the various error messages.
    1093                          raise OSError('Multiple exceptions: {}'.format(
    1094                              ', '.join(str(exc) for exc in exceptions)))
    1095                  finally:
    1096                      exceptions = None
    1097  
    1098          else:
    1099              if sock is None:
    1100                  raise ValueError(
    1101                      'host and port was not specified and no sock specified')
    1102              if sock.type != socket.SOCK_STREAM:
    1103                  # We allow AF_INET, AF_INET6, AF_UNIX as long as they
    1104                  # are SOCK_STREAM.
    1105                  # We support passing AF_UNIX sockets even though we have
    1106                  # a dedicated API for that: create_unix_connection.
    1107                  # Disallowing AF_UNIX in this method, breaks backwards
    1108                  # compatibility.
    1109                  raise ValueError(
    1110                      f'A Stream Socket was expected, got {sock!r}')
    1111  
    1112          transport, protocol = await self._create_connection_transport(
    1113              sock, protocol_factory, ssl, server_hostname,
    1114              ssl_handshake_timeout=ssl_handshake_timeout,
    1115              ssl_shutdown_timeout=ssl_shutdown_timeout)
    1116          if self._debug:
    1117              # Get the socket from the transport because SSL transport closes
    1118              # the old socket and creates a new SSL socket
    1119              sock = transport.get_extra_info('socket')
    1120              logger.debug("%r connected to %s:%r: (%r, %r)",
    1121                           sock, host, port, transport, protocol)
    1122          return transport, protocol
    1123  
    1124      async def _create_connection_transport(
    1125              self, sock, protocol_factory, ssl,
    1126              server_hostname, server_side=False,
    1127              ssl_handshake_timeout=None,
    1128              ssl_shutdown_timeout=None):
    1129  
    1130          sock.setblocking(False)
    1131  
    1132          protocol = protocol_factory()
    1133          waiter = self.create_future()
    1134          if ssl:
    1135              sslcontext = None if isinstance(ssl, bool) else ssl
    1136              transport = self._make_ssl_transport(
    1137                  sock, protocol, sslcontext, waiter,
    1138                  server_side=server_side, server_hostname=server_hostname,
    1139                  ssl_handshake_timeout=ssl_handshake_timeout,
    1140                  ssl_shutdown_timeout=ssl_shutdown_timeout)
    1141          else:
    1142              transport = self._make_socket_transport(sock, protocol, waiter)
    1143  
    1144          try:
    1145              await waiter
    1146          except:
    1147              transport.close()
    1148              raise
    1149  
    1150          return transport, protocol
    1151  
    1152      async def sendfile(self, transport, file, offset=0, count=None,
    1153                         *, fallback=True):
    1154          """Send a file to transport.
    1155  
    1156          Return the total number of bytes which were sent.
    1157  
    1158          The method uses high-performance os.sendfile if available.
    1159  
    1160          file must be a regular file object opened in binary mode.
    1161  
    1162          offset tells from where to start reading the file. If specified,
    1163          count is the total number of bytes to transmit as opposed to
    1164          sending the file until EOF is reached. File position is updated on
    1165          return or also in case of error in which case file.tell()
    1166          can be used to figure out the number of bytes
    1167          which were sent.
    1168  
    1169          fallback set to True makes asyncio to manually read and send
    1170          the file when the platform does not support the sendfile syscall
    1171          (e.g. Windows or SSL socket on Unix).
    1172  
    1173          Raise SendfileNotAvailableError if the system does not support
    1174          sendfile syscall and fallback is False.
    1175          """
    1176          if transport.is_closing():
    1177              raise RuntimeError("Transport is closing")
    1178          mode = getattr(transport, '_sendfile_compatible',
    1179                         constants._SendfileMode.UNSUPPORTED)
    1180          if mode is constants._SendfileMode.UNSUPPORTED:
    1181              raise RuntimeError(
    1182                  f"sendfile is not supported for transport {transport!r}")
    1183          if mode is constants._SendfileMode.TRY_NATIVE:
    1184              try:
    1185                  return await self._sendfile_native(transport, file,
    1186                                                     offset, count)
    1187              except exceptions.SendfileNotAvailableError as exc:
    1188                  if not fallback:
    1189                      raise
    1190  
    1191          if not fallback:
    1192              raise RuntimeError(
    1193                  f"fallback is disabled and native sendfile is not "
    1194                  f"supported for transport {transport!r}")
    1195  
    1196          return await self._sendfile_fallback(transport, file,
    1197                                               offset, count)
    1198  
    1199      async def _sendfile_native(self, transp, file, offset, count):
    1200          raise exceptions.SendfileNotAvailableError(
    1201              "sendfile syscall is not supported")
    1202  
    1203      async def _sendfile_fallback(self, transp, file, offset, count):
    1204          if offset:
    1205              file.seek(offset)
    1206          blocksize = min(count, 16384) if count else 16384
    1207          buf = bytearray(blocksize)
    1208          total_sent = 0
    1209          proto = _SendfileFallbackProtocol(transp)
    1210          try:
    1211              while True:
    1212                  if count:
    1213                      blocksize = min(count - total_sent, blocksize)
    1214                      if blocksize <= 0:
    1215                          return total_sent
    1216                  view = memoryview(buf)[:blocksize]
    1217                  read = await self.run_in_executor(None, file.readinto, view)
    1218                  if not read:
    1219                      return total_sent  # EOF
    1220                  await proto.drain()
    1221                  transp.write(view[:read])
    1222                  total_sent += read
    1223          finally:
    1224              if total_sent > 0 and hasattr(file, 'seek'):
    1225                  file.seek(offset + total_sent)
    1226              await proto.restore()
    1227  
    1228      async def start_tls(self, transport, protocol, sslcontext, *,
    1229                          server_side=False,
    1230                          server_hostname=None,
    1231                          ssl_handshake_timeout=None,
    1232                          ssl_shutdown_timeout=None):
    1233          """Upgrade transport to TLS.
    1234  
    1235          Return a new transport that *protocol* should start using
    1236          immediately.
    1237          """
    1238          if ssl is None:
    1239              raise RuntimeError('Python ssl module is not available')
    1240  
    1241          if not isinstance(sslcontext, ssl.SSLContext):
    1242              raise TypeError(
    1243                  f'sslcontext is expected to be an instance of ssl.SSLContext, '
    1244                  f'got {sslcontext!r}')
    1245  
    1246          if not getattr(transport, '_start_tls_compatible', False):
    1247              raise TypeError(
    1248                  f'transport {transport!r} is not supported by start_tls()')
    1249  
    1250          waiter = self.create_future()
    1251          ssl_protocol = sslproto.SSLProtocol(
    1252              self, protocol, sslcontext, waiter,
    1253              server_side, server_hostname,
    1254              ssl_handshake_timeout=ssl_handshake_timeout,
    1255              ssl_shutdown_timeout=ssl_shutdown_timeout,
    1256              call_connection_made=False)
    1257  
    1258          # Pause early so that "ssl_protocol.data_received()" doesn't
    1259          # have a chance to get called before "ssl_protocol.connection_made()".
    1260          transport.pause_reading()
    1261  
    1262          transport.set_protocol(ssl_protocol)
    1263          conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
    1264          resume_cb = self.call_soon(transport.resume_reading)
    1265  
    1266          try:
    1267              await waiter
    1268          except BaseException:
    1269              transport.close()
    1270              conmade_cb.cancel()
    1271              resume_cb.cancel()
    1272              raise
    1273  
    1274          return ssl_protocol._app_transport
    1275  
    1276      async def create_datagram_endpoint(self, protocol_factory,
    1277                                         local_addr=None, remote_addr=None, *,
    1278                                         family=0, proto=0, flags=0,
    1279                                         reuse_port=None,
    1280                                         allow_broadcast=None, sock=None):
    1281          """Create datagram connection."""
    1282          if sock is not None:
    1283              if sock.type != socket.SOCK_DGRAM:
    1284                  raise ValueError(
    1285                      f'A UDP Socket was expected, got {sock!r}')
    1286              if (local_addr or remote_addr or
    1287                      family or proto or flags or
    1288                      reuse_port or allow_broadcast):
    1289                  # show the problematic kwargs in exception msg
    1290                  opts = dict(local_addr=local_addr, remote_addr=remote_addr,
    1291                              family=family, proto=proto, flags=flags,
    1292                              reuse_port=reuse_port,
    1293                              allow_broadcast=allow_broadcast)
    1294                  problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
    1295                  raise ValueError(
    1296                      f'socket modifier keyword arguments can not be used '
    1297                      f'when sock is specified. ({problems})')
    1298              sock.setblocking(False)
    1299              r_addr = None
    1300          else:
    1301              if not (local_addr or remote_addr):
    1302                  if family == 0:
    1303                      raise ValueError('unexpected address family')
    1304                  addr_pairs_info = (((family, proto), (None, None)),)
    1305              elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
    1306                  for addr in (local_addr, remote_addr):
    1307                      if addr is not None and not isinstance(addr, str):
    1308                          raise TypeError('string is expected')
    1309  
    1310                  if local_addr and local_addr[0] not in (0, '\x00'):
    1311                      try:
    1312                          if stat.S_ISSOCK(os.stat(local_addr).st_mode):
    1313                              os.remove(local_addr)
    1314                      except FileNotFoundError:
    1315                          pass
    1316                      except OSError as err:
    1317                          # Directory may have permissions only to create socket.
    1318                          logger.error('Unable to check or remove stale UNIX '
    1319                                       'socket %r: %r',
    1320                                       local_addr, err)
    1321  
    1322                  addr_pairs_info = (((family, proto),
    1323                                      (local_addr, remote_addr)), )
    1324              else:
    1325                  # join address by (family, protocol)
    1326                  addr_infos = {}  # Using order preserving dict
    1327                  for idx, addr in ((0, local_addr), (1, remote_addr)):
    1328                      if addr is not None:
    1329                          if not (isinstance(addr, tuple) and len(addr) == 2):
    1330                              raise TypeError('2-tuple is expected')
    1331  
    1332                          infos = await self._ensure_resolved(
    1333                              addr, family=family, type=socket.SOCK_DGRAM,
    1334                              proto=proto, flags=flags, loop=self)
    1335                          if not infos:
    1336                              raise OSError('getaddrinfo() returned empty list')
    1337  
    1338                          for fam, _, pro, _, address in infos:
    1339                              key = (fam, pro)
    1340                              if key not in addr_infos:
    1341                                  addr_infos[key] = [None, None]
    1342                              addr_infos[key][idx] = address
    1343  
    1344                  # each addr has to have info for each (family, proto) pair
    1345                  addr_pairs_info = [
    1346                      (key, addr_pair) for key, addr_pair in addr_infos.items()
    1347                      if not ((local_addr and addr_pair[0] is None) or
    1348                              (remote_addr and addr_pair[1] is None))]
    1349  
    1350                  if not addr_pairs_info:
    1351                      raise ValueError('can not get address information')
    1352  
    1353              exceptions = []
    1354  
    1355              for ((family, proto),
    1356                   (local_address, remote_address)) in addr_pairs_info:
    1357                  sock = None
    1358                  r_addr = None
    1359                  try:
    1360                      sock = socket.socket(
    1361                          family=family, type=socket.SOCK_DGRAM, proto=proto)
    1362                      if reuse_port:
    1363                          _set_reuseport(sock)
    1364                      if allow_broadcast:
    1365                          sock.setsockopt(
    1366                              socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    1367                      sock.setblocking(False)
    1368  
    1369                      if local_addr:
    1370                          sock.bind(local_address)
    1371                      if remote_addr:
    1372                          if not allow_broadcast:
    1373                              await self.sock_connect(sock, remote_address)
    1374                          r_addr = remote_address
    1375                  except OSError as exc:
    1376                      if sock is not None:
    1377                          sock.close()
    1378                      exceptions.append(exc)
    1379                  except:
    1380                      if sock is not None:
    1381                          sock.close()
    1382                      raise
    1383                  else:
    1384                      break
    1385              else:
    1386                  raise exceptions[0]
    1387  
    1388          protocol = protocol_factory()
    1389          waiter = self.create_future()
    1390          transport = self._make_datagram_transport(
    1391              sock, protocol, r_addr, waiter)
    1392          if self._debug:
    1393              if local_addr:
    1394                  logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
    1395                              "created: (%r, %r)",
    1396                              local_addr, remote_addr, transport, protocol)
    1397              else:
    1398                  logger.debug("Datagram endpoint remote_addr=%r created: "
    1399                               "(%r, %r)",
    1400                               remote_addr, transport, protocol)
    1401  
    1402          try:
    1403              await waiter
    1404          except:
    1405              transport.close()
    1406              raise
    1407  
    1408          return transport, protocol
    1409  
    1410      async def _ensure_resolved(self, address, *,
    1411                                 family=0, type=socket.SOCK_STREAM,
    1412                                 proto=0, flags=0, loop):
    1413          host, port = address[:2]
    1414          info = _ipaddr_info(host, port, family, type, proto, *address[2:])
    1415          if info is not None:
    1416              # "host" is already a resolved IP.
    1417              return [info]
    1418          else:
    1419              return await loop.getaddrinfo(host, port, family=family, type=type,
    1420                                            proto=proto, flags=flags)
    1421  
    1422      async def _create_server_getaddrinfo(self, host, port, family, flags):
    1423          infos = await self._ensure_resolved((host, port), family=family,
    1424                                              type=socket.SOCK_STREAM,
    1425                                              flags=flags, loop=self)
    1426          if not infos:
    1427              raise OSError(f'getaddrinfo({host!r}) returned empty list')
    1428          return infos
    1429  
    1430      async def create_server(
    1431              self, protocol_factory, host=None, port=None,
    1432              *,
    1433              family=socket.AF_UNSPEC,
    1434              flags=socket.AI_PASSIVE,
    1435              sock=None,
    1436              backlog=100,
    1437              ssl=None,
    1438              reuse_address=None,
    1439              reuse_port=None,
    1440              ssl_handshake_timeout=None,
    1441              ssl_shutdown_timeout=None,
    1442              start_serving=True):
    1443          """Create a TCP server.
    1444  
    1445          The host parameter can be a string, in that case the TCP server is
    1446          bound to host and port.
    1447  
    1448          The host parameter can also be a sequence of strings and in that case
    1449          the TCP server is bound to all hosts of the sequence. If a host
    1450          appears multiple times (possibly indirectly e.g. when hostnames
    1451          resolve to the same IP address), the server is only bound once to that
    1452          host.
    1453  
    1454          Return a Server object which can be used to stop the service.
    1455  
    1456          This method is a coroutine.
    1457          """
    1458          if isinstance(ssl, bool):
    1459              raise TypeError('ssl argument must be an SSLContext or None')
    1460  
    1461          if ssl_handshake_timeout is not None and ssl is None:
    1462              raise ValueError(
    1463                  'ssl_handshake_timeout is only meaningful with ssl')
    1464  
    1465          if ssl_shutdown_timeout is not None and ssl is None:
    1466              raise ValueError(
    1467                  'ssl_shutdown_timeout is only meaningful with ssl')
    1468  
    1469          if sock is not None:
    1470              _check_ssl_socket(sock)
    1471  
    1472          if host is not None or port is not None:
    1473              if sock is not None:
    1474                  raise ValueError(
    1475                      'host/port and sock can not be specified at the same time')
    1476  
    1477              if reuse_address is None:
    1478                  reuse_address = os.name == "posix" and sys.platform != "cygwin"
    1479              sockets = []
    1480              if host == '':
    1481                  hosts = [None]
    1482              elif (isinstance(host, str) or
    1483                    not isinstance(host, collections.abc.Iterable)):
    1484                  hosts = [host]
    1485              else:
    1486                  hosts = host
    1487  
    1488              fs = [self._create_server_getaddrinfo(host, port, family=family,
    1489                                                    flags=flags)
    1490                    for host in hosts]
    1491              infos = await tasks.gather(*fs)
    1492              infos = set(itertools.chain.from_iterable(infos))
    1493  
    1494              completed = False
    1495              try:
    1496                  for res in infos:
    1497                      af, socktype, proto, canonname, sa = res
    1498                      try:
    1499                          sock = socket.socket(af, socktype, proto)
    1500                      except socket.error:
    1501                          # Assume it's a bad family/type/protocol combination.
    1502                          if self._debug:
    1503                              logger.warning('create_server() failed to create '
    1504                                             'socket.socket(%r, %r, %r)',
    1505                                             af, socktype, proto, exc_info=True)
    1506                          continue
    1507                      sockets.append(sock)
    1508                      if reuse_address:
    1509                          sock.setsockopt(
    1510                              socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    1511                      if reuse_port:
    1512                          _set_reuseport(sock)
    1513                      # Disable IPv4/IPv6 dual stack support (enabled by
    1514                      # default on Linux) which makes a single socket
    1515                      # listen on both address families.
    1516                      if (_HAS_IPv6 and
    1517                              af == socket.AF_INET6 and
    1518                              hasattr(socket, 'IPPROTO_IPV6')):
    1519                          sock.setsockopt(socket.IPPROTO_IPV6,
    1520                                          socket.IPV6_V6ONLY,
    1521                                          True)
    1522                      try:
    1523                          sock.bind(sa)
    1524                      except OSError as err:
    1525                          raise OSError(err.errno, 'error while attempting '
    1526                                        'to bind on address %r: %s'
    1527                                        % (sa, err.strerror.lower())) from None
    1528                  completed = True
    1529              finally:
    1530                  if not completed:
    1531                      for sock in sockets:
    1532                          sock.close()
    1533          else:
    1534              if sock is None:
    1535                  raise ValueError('Neither host/port nor sock were specified')
    1536              if sock.type != socket.SOCK_STREAM:
    1537                  raise ValueError(f'A Stream Socket was expected, got {sock!r}')
    1538              sockets = [sock]
    1539  
    1540          for sock in sockets:
    1541              sock.setblocking(False)
    1542  
    1543          server = Server(self, sockets, protocol_factory,
    1544                          ssl, backlog, ssl_handshake_timeout,
    1545                          ssl_shutdown_timeout)
    1546          if start_serving:
    1547              server._start_serving()
    1548              # Skip one loop iteration so that all 'loop.add_reader'
    1549              # go through.
    1550              await tasks.sleep(0)
    1551  
    1552          if self._debug:
    1553              logger.info("%r is serving", server)
    1554          return server
    1555  
    1556      async def connect_accepted_socket(
    1557              self, protocol_factory, sock,
    1558              *, ssl=None,
    1559              ssl_handshake_timeout=None,
    1560              ssl_shutdown_timeout=None):
    1561          if sock.type != socket.SOCK_STREAM:
    1562              raise ValueError(f'A Stream Socket was expected, got {sock!r}')
    1563  
    1564          if ssl_handshake_timeout is not None and not ssl:
    1565              raise ValueError(
    1566                  'ssl_handshake_timeout is only meaningful with ssl')
    1567  
    1568          if ssl_shutdown_timeout is not None and not ssl:
    1569              raise ValueError(
    1570                  'ssl_shutdown_timeout is only meaningful with ssl')
    1571  
    1572          if sock is not None:
    1573              _check_ssl_socket(sock)
    1574  
    1575          transport, protocol = await self._create_connection_transport(
    1576              sock, protocol_factory, ssl, '', server_side=True,
    1577              ssl_handshake_timeout=ssl_handshake_timeout,
    1578              ssl_shutdown_timeout=ssl_shutdown_timeout)
    1579          if self._debug:
    1580              # Get the socket from the transport because SSL transport closes
    1581              # the old socket and creates a new SSL socket
    1582              sock = transport.get_extra_info('socket')
    1583              logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
    1584          return transport, protocol
    1585  
    1586      async def connect_read_pipe(self, protocol_factory, pipe):
    1587          protocol = protocol_factory()
    1588          waiter = self.create_future()
    1589          transport = self._make_read_pipe_transport(pipe, protocol, waiter)
    1590  
    1591          try:
    1592              await waiter
    1593          except:
    1594              transport.close()
    1595              raise
    1596  
    1597          if self._debug:
    1598              logger.debug('Read pipe %r connected: (%r, %r)',
    1599                           pipe.fileno(), transport, protocol)
    1600          return transport, protocol
    1601  
    1602      async def connect_write_pipe(self, protocol_factory, pipe):
    1603          protocol = protocol_factory()
    1604          waiter = self.create_future()
    1605          transport = self._make_write_pipe_transport(pipe, protocol, waiter)
    1606  
    1607          try:
    1608              await waiter
    1609          except:
    1610              transport.close()
    1611              raise
    1612  
    1613          if self._debug:
    1614              logger.debug('Write pipe %r connected: (%r, %r)',
    1615                           pipe.fileno(), transport, protocol)
    1616          return transport, protocol
    1617  
    1618      def _log_subprocess(self, msg, stdin, stdout, stderr):
    1619          info = [msg]
    1620          if stdin is not None:
    1621              info.append(f'stdin={_format_pipe(stdin)}')
    1622          if stdout is not None and stderr == subprocess.STDOUT:
    1623              info.append(f'stdout=stderr={_format_pipe(stdout)}')
    1624          else:
    1625              if stdout is not None:
    1626                  info.append(f'stdout={_format_pipe(stdout)}')
    1627              if stderr is not None:
    1628                  info.append(f'stderr={_format_pipe(stderr)}')
    1629          logger.debug(' '.join(info))
    1630  
    1631      async def subprocess_shell(self, protocol_factory, cmd, *,
    1632                                 stdin=subprocess.PIPE,
    1633                                 stdout=subprocess.PIPE,
    1634                                 stderr=subprocess.PIPE,
    1635                                 universal_newlines=False,
    1636                                 shell=True, bufsize=0,
    1637                                 encoding=None, errors=None, text=None,
    1638                                 **kwargs):
    1639          if not isinstance(cmd, (bytes, str)):
    1640              raise ValueError("cmd must be a string")
    1641          if universal_newlines:
    1642              raise ValueError("universal_newlines must be False")
    1643          if not shell:
    1644              raise ValueError("shell must be True")
    1645          if bufsize != 0:
    1646              raise ValueError("bufsize must be 0")
    1647          if text:
    1648              raise ValueError("text must be False")
    1649          if encoding is not None:
    1650              raise ValueError("encoding must be None")
    1651          if errors is not None:
    1652              raise ValueError("errors must be None")
    1653  
    1654          protocol = protocol_factory()
    1655          debug_log = None
    1656          if self._debug:
    1657              # don't log parameters: they may contain sensitive information
    1658              # (password) and may be too long
    1659              debug_log = 'run shell command %r' % cmd
    1660              self._log_subprocess(debug_log, stdin, stdout, stderr)
    1661          transport = await self._make_subprocess_transport(
    1662              protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
    1663          if self._debug and debug_log is not None:
    1664              logger.info('%s: %r', debug_log, transport)
    1665          return transport, protocol
    1666  
    1667      async def subprocess_exec(self, protocol_factory, program, *args,
    1668                                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    1669                                stderr=subprocess.PIPE, universal_newlines=False,
    1670                                shell=False, bufsize=0,
    1671                                encoding=None, errors=None, text=None,
    1672                                **kwargs):
    1673          if universal_newlines:
    1674              raise ValueError("universal_newlines must be False")
    1675          if shell:
    1676              raise ValueError("shell must be False")
    1677          if bufsize != 0:
    1678              raise ValueError("bufsize must be 0")
    1679          if text:
    1680              raise ValueError("text must be False")
    1681          if encoding is not None:
    1682              raise ValueError("encoding must be None")
    1683          if errors is not None:
    1684              raise ValueError("errors must be None")
    1685  
    1686          popen_args = (program,) + args
    1687          protocol = protocol_factory()
    1688          debug_log = None
    1689          if self._debug:
    1690              # don't log parameters: they may contain sensitive information
    1691              # (password) and may be too long
    1692              debug_log = f'execute program {program!r}'
    1693              self._log_subprocess(debug_log, stdin, stdout, stderr)
    1694          transport = await self._make_subprocess_transport(
    1695              protocol, popen_args, False, stdin, stdout, stderr,
    1696              bufsize, **kwargs)
    1697          if self._debug and debug_log is not None:
    1698              logger.info('%s: %r', debug_log, transport)
    1699          return transport, protocol
    1700  
    1701      def get_exception_handler(self):
    1702          """Return an exception handler, or None if the default one is in use.
    1703          """
    1704          return self._exception_handler
    1705  
    1706      def set_exception_handler(self, handler):
    1707          """Set handler as the new event loop exception handler.
    1708  
    1709          If handler is None, the default exception handler will
    1710          be set.
    1711  
    1712          If handler is a callable object, it should have a
    1713          signature matching '(loop, context)', where 'loop'
    1714          will be a reference to the active event loop, 'context'
    1715          will be a dict object (see `call_exception_handler()`
    1716          documentation for details about context).
    1717          """
    1718          if handler is not None and not callable(handler):
    1719              raise TypeError(f'A callable object or None is expected, '
    1720                              f'got {handler!r}')
    1721          self._exception_handler = handler
    1722  
    1723      def default_exception_handler(self, context):
    1724          """Default exception handler.
    1725  
    1726          This is called when an exception occurs and no exception
    1727          handler is set, and can be called by a custom exception
    1728          handler that wants to defer to the default behavior.
    1729  
    1730          This default handler logs the error message and other
    1731          context-dependent information.  In debug mode, a truncated
    1732          stack trace is also appended showing where the given object
    1733          (e.g. a handle or future or task) was created, if any.
    1734  
    1735          The context parameter has the same meaning as in
    1736          `call_exception_handler()`.
    1737          """
    1738          message = context.get('message')
    1739          if not message:
    1740              message = 'Unhandled exception in event loop'
    1741  
    1742          exception = context.get('exception')
    1743          if exception is not None:
    1744              exc_info = (type(exception), exception, exception.__traceback__)
    1745          else:
    1746              exc_info = False
    1747  
    1748          if ('source_traceback' not in context and
    1749                  self._current_handle is not None and
    1750                  self._current_handle._source_traceback):
    1751              context['handle_traceback'] = \
    1752                  self._current_handle._source_traceback
    1753  
    1754          log_lines = [message]
    1755          for key in sorted(context):
    1756              if key in {'message', 'exception'}:
    1757                  continue
    1758              value = context[key]
    1759              if key == 'source_traceback':
    1760                  tb = ''.join(traceback.format_list(value))
    1761                  value = 'Object created at (most recent call last):\n'
    1762                  value += tb.rstrip()
    1763              elif key == 'handle_traceback':
    1764                  tb = ''.join(traceback.format_list(value))
    1765                  value = 'Handle created at (most recent call last):\n'
    1766                  value += tb.rstrip()
    1767              else:
    1768                  value = repr(value)
    1769              log_lines.append(f'{key}: {value}')
    1770  
    1771          logger.error('\n'.join(log_lines), exc_info=exc_info)
    1772  
    1773      def call_exception_handler(self, context):
    1774          """Call the current event loop's exception handler.
    1775  
    1776          The context argument is a dict containing the following keys:
    1777  
    1778          - 'message': Error message;
    1779          - 'exception' (optional): Exception object;
    1780          - 'future' (optional): Future instance;
    1781          - 'task' (optional): Task instance;
    1782          - 'handle' (optional): Handle instance;
    1783          - 'protocol' (optional): Protocol instance;
    1784          - 'transport' (optional): Transport instance;
    1785          - 'socket' (optional): Socket instance;
    1786          - 'asyncgen' (optional): Asynchronous generator that caused
    1787                                   the exception.
    1788  
    1789          New keys maybe introduced in the future.
    1790  
    1791          Note: do not overload this method in an event loop subclass.
    1792          For custom exception handling, use the
    1793          `set_exception_handler()` method.
    1794          """
    1795          if self._exception_handler is None:
    1796              try:
    1797                  self.default_exception_handler(context)
    1798              except (SystemExit, KeyboardInterrupt):
    1799                  raise
    1800              except BaseException:
    1801                  # Second protection layer for unexpected errors
    1802                  # in the default implementation, as well as for subclassed
    1803                  # event loops with overloaded "default_exception_handler".
    1804                  logger.error('Exception in default exception handler',
    1805                               exc_info=True)
    1806          else:
    1807              try:
    1808                  self._exception_handler(self, context)
    1809              except (SystemExit, KeyboardInterrupt):
    1810                  raise
    1811              except BaseException as exc:
    1812                  # Exception in the user set custom exception handler.
    1813                  try:
    1814                      # Let's try default handler.
    1815                      self.default_exception_handler({
    1816                          'message': 'Unhandled error in exception handler',
    1817                          'exception': exc,
    1818                          'context': context,
    1819                      })
    1820                  except (SystemExit, KeyboardInterrupt):
    1821                      raise
    1822                  except BaseException:
    1823                      # Guard 'default_exception_handler' in case it is
    1824                      # overloaded.
    1825                      logger.error('Exception in default exception handler '
    1826                                   'while handling an unexpected error '
    1827                                   'in custom exception handler',
    1828                                   exc_info=True)
    1829  
    1830      def _add_callback(self, handle):
    1831          """Add a Handle to _ready."""
    1832          if not handle._cancelled:
    1833              self._ready.append(handle)
    1834  
    1835      def _add_callback_signalsafe(self, handle):
    1836          """Like _add_callback() but called from a signal handler."""
    1837          self._add_callback(handle)
    1838          self._write_to_self()
    1839  
    1840      def _timer_handle_cancelled(self, handle):
    1841          """Notification that a TimerHandle has been cancelled."""
    1842          if handle._scheduled:
    1843              self._timer_cancelled_count += 1
    1844  
    1845      def _run_once(self):
    1846          """Run one full iteration of the event loop.
    1847  
    1848          This calls all currently ready callbacks, polls for I/O,
    1849          schedules the resulting callbacks, and finally schedules
    1850          'call_later' callbacks.
    1851          """
    1852  
    1853          sched_count = len(self._scheduled)
    1854          if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
    1855              self._timer_cancelled_count / sched_count >
    1856                  _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
    1857              # Remove delayed calls that were cancelled if their number
    1858              # is too high
    1859              new_scheduled = []
    1860              for handle in self._scheduled:
    1861                  if handle._cancelled:
    1862                      handle._scheduled = False
    1863                  else:
    1864                      new_scheduled.append(handle)
    1865  
    1866              heapq.heapify(new_scheduled)
    1867              self._scheduled = new_scheduled
    1868              self._timer_cancelled_count = 0
    1869          else:
    1870              # Remove delayed calls that were cancelled from head of queue.
    1871              while self._scheduled and self._scheduled[0]._cancelled:
    1872                  self._timer_cancelled_count -= 1
    1873                  handle = heapq.heappop(self._scheduled)
    1874                  handle._scheduled = False
    1875  
    1876          timeout = None
    1877          if self._ready or self._stopping:
    1878              timeout = 0
    1879          elif self._scheduled:
    1880              # Compute the desired timeout.
    1881              when = self._scheduled[0]._when
    1882              timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    1883  
    1884          event_list = self._selector.select(timeout)
    1885          self._process_events(event_list)
    1886          # Needed to break cycles when an exception occurs.
    1887          event_list = None
    1888  
    1889          # Handle 'later' callbacks that are ready.
    1890          end_time = self.time() + self._clock_resolution
    1891          while self._scheduled:
    1892              handle = self._scheduled[0]
    1893              if handle._when >= end_time:
    1894                  break
    1895              handle = heapq.heappop(self._scheduled)
    1896              handle._scheduled = False
    1897              self._ready.append(handle)
    1898  
    1899          # This is the only place where callbacks are actually *called*.
    1900          # All other places just add them to ready.
    1901          # Note: We run all currently scheduled callbacks, but not any
    1902          # callbacks scheduled by callbacks run this time around --
    1903          # they will be run the next time (after another I/O poll).
    1904          # Use an idiom that is thread-safe without using locks.
    1905          ntodo = len(self._ready)
    1906          for i in range(ntodo):
    1907              handle = self._ready.popleft()
    1908              if handle._cancelled:
    1909                  continue
    1910              if self._debug:
    1911                  try:
    1912                      self._current_handle = handle
    1913                      t0 = self.time()
    1914                      handle._run()
    1915                      dt = self.time() - t0
    1916                      if dt >= self.slow_callback_duration:
    1917                          logger.warning('Executing %s took %.3f seconds',
    1918                                         _format_handle(handle), dt)
    1919                  finally:
    1920                      self._current_handle = None
    1921              else:
    1922                  handle._run()
    1923          handle = None  # Needed to break cycles when an exception occurs.
    1924  
    1925      def _set_coroutine_origin_tracking(self, enabled):
    1926          if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
    1927              return
    1928  
    1929          if enabled:
    1930              self._coroutine_origin_tracking_saved_depth = (
    1931                  sys.get_coroutine_origin_tracking_depth())
    1932              sys.set_coroutine_origin_tracking_depth(
    1933                  constants.DEBUG_STACK_DEPTH)
    1934          else:
    1935              sys.set_coroutine_origin_tracking_depth(
    1936                  self._coroutine_origin_tracking_saved_depth)
    1937  
    1938          self._coroutine_origin_tracking_enabled = enabled
    1939  
    1940      def get_debug(self):
    1941          return self._debug
    1942  
    1943      def set_debug(self, enabled):
    1944          self._debug = enabled
    1945  
    1946          if self.is_running():
    1947              self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)