(root)/
Python-3.12.0/
Lib/
test/
support/
asyncore.py
       1  # TODO: This module was deprecated and removed from CPython 3.12
       2  # Now it is a test-only helper. Any attempts to rewrite exising tests that
       3  # are using this module and remove it completely are appreciated!
       4  # See: https://github.com/python/cpython/issues/72719
       5  
       6  # -*- Mode: Python -*-
       7  #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
       8  #   Author: Sam Rushing <rushing@nightmare.com>
       9  
      10  # ======================================================================
      11  # Copyright 1996 by Sam Rushing
      12  #
      13  #                         All Rights Reserved
      14  #
      15  # Permission to use, copy, modify, and distribute this software and
      16  # its documentation for any purpose and without fee is hereby
      17  # granted, provided that the above copyright notice appear in all
      18  # copies and that both that copyright notice and this permission
      19  # notice appear in supporting documentation, and that the name of Sam
      20  # Rushing not be used in advertising or publicity pertaining to
      21  # distribution of the software without specific, written prior
      22  # permission.
      23  #
      24  # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
      25  # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
      26  # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
      27  # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
      28  # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
      29  # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
      30  # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
      31  # ======================================================================
      32  
      33  """Basic infrastructure for asynchronous socket service clients and servers.
      34  
      35  There are only two ways to have a program on a single processor do "more
      36  than one thing at a time".  Multi-threaded programming is the simplest and
      37  most popular way to do it, but there is another very different technique,
      38  that lets you have nearly all the advantages of multi-threading, without
      39  actually using multiple threads. it's really only practical if your program
      40  is largely I/O bound. If your program is CPU bound, then pre-emptive
      41  scheduled threads are probably what you really need. Network servers are
      42  rarely CPU-bound, however.
      43  
      44  If your operating system supports the select() system call in its I/O
      45  library (and nearly all do), then you can use it to juggle multiple
      46  communication channels at once; doing other work while your I/O is taking
      47  place in the "background."  Although this strategy can seem strange and
      48  complex, especially at first, it is in many ways easier to understand and
      49  control than multi-threaded programming. The module documented here solves
      50  many of the difficult problems for you, making the task of building
      51  sophisticated high-performance network servers and clients a snap.
      52  """
      53  
      54  import select
      55  import socket
      56  import sys
      57  import time
      58  import warnings
      59  
      60  import os
      61  from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
      62       ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
      63       errorcode
      64  
      65  
      66  _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
      67                             EBADF})
      68  
      69  try:
      70      socket_map
      71  except NameError:
      72      socket_map = {}
      73  
      74  def _strerror(err):
      75      try:
      76          return os.strerror(err)
      77      except (ValueError, OverflowError, NameError):
      78          if err in errorcode:
      79              return errorcode[err]
      80          return "Unknown error %s" %err
      81  
      82  class ESC[4;38;5;81mExitNow(ESC[4;38;5;149mException):
      83      pass
      84  
      85  _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
      86  
      87  def read(obj):
      88      try:
      89          obj.handle_read_event()
      90      except _reraised_exceptions:
      91          raise
      92      except:
      93          obj.handle_error()
      94  
      95  def write(obj):
      96      try:
      97          obj.handle_write_event()
      98      except _reraised_exceptions:
      99          raise
     100      except:
     101          obj.handle_error()
     102  
     103  def _exception(obj):
     104      try:
     105          obj.handle_expt_event()
     106      except _reraised_exceptions:
     107          raise
     108      except:
     109          obj.handle_error()
     110  
     111  def readwrite(obj, flags):
     112      try:
     113          if flags & select.POLLIN:
     114              obj.handle_read_event()
     115          if flags & select.POLLOUT:
     116              obj.handle_write_event()
     117          if flags & select.POLLPRI:
     118              obj.handle_expt_event()
     119          if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
     120              obj.handle_close()
     121      except OSError as e:
     122          if e.errno not in _DISCONNECTED:
     123              obj.handle_error()
     124          else:
     125              obj.handle_close()
     126      except _reraised_exceptions:
     127          raise
     128      except:
     129          obj.handle_error()
     130  
     131  def poll(timeout=0.0, map=None):
     132      if map is None:
     133          map = socket_map
     134      if map:
     135          r = []; w = []; e = []
     136          for fd, obj in list(map.items()):
     137              is_r = obj.readable()
     138              is_w = obj.writable()
     139              if is_r:
     140                  r.append(fd)
     141              # accepting sockets should not be writable
     142              if is_w and not obj.accepting:
     143                  w.append(fd)
     144              if is_r or is_w:
     145                  e.append(fd)
     146          if [] == r == w == e:
     147              time.sleep(timeout)
     148              return
     149  
     150          r, w, e = select.select(r, w, e, timeout)
     151  
     152          for fd in r:
     153              obj = map.get(fd)
     154              if obj is None:
     155                  continue
     156              read(obj)
     157  
     158          for fd in w:
     159              obj = map.get(fd)
     160              if obj is None:
     161                  continue
     162              write(obj)
     163  
     164          for fd in e:
     165              obj = map.get(fd)
     166              if obj is None:
     167                  continue
     168              _exception(obj)
     169  
     170  def poll2(timeout=0.0, map=None):
     171      # Use the poll() support added to the select module in Python 2.0
     172      if map is None:
     173          map = socket_map
     174      if timeout is not None:
     175          # timeout is in milliseconds
     176          timeout = int(timeout*1000)
     177      pollster = select.poll()
     178      if map:
     179          for fd, obj in list(map.items()):
     180              flags = 0
     181              if obj.readable():
     182                  flags |= select.POLLIN | select.POLLPRI
     183              # accepting sockets should not be writable
     184              if obj.writable() and not obj.accepting:
     185                  flags |= select.POLLOUT
     186              if flags:
     187                  pollster.register(fd, flags)
     188  
     189          r = pollster.poll(timeout)
     190          for fd, flags in r:
     191              obj = map.get(fd)
     192              if obj is None:
     193                  continue
     194              readwrite(obj, flags)
     195  
     196  poll3 = poll2                           # Alias for backward compatibility
     197  
     198  def loop(timeout=30.0, use_poll=False, map=None, count=None):
     199      if map is None:
     200          map = socket_map
     201  
     202      if use_poll and hasattr(select, 'poll'):
     203          poll_fun = poll2
     204      else:
     205          poll_fun = poll
     206  
     207      if count is None:
     208          while map:
     209              poll_fun(timeout, map)
     210  
     211      else:
     212          while map and count > 0:
     213              poll_fun(timeout, map)
     214              count = count - 1
     215  
     216  class ESC[4;38;5;81mdispatcher:
     217  
     218      debug = False
     219      connected = False
     220      accepting = False
     221      connecting = False
     222      closing = False
     223      addr = None
     224      ignore_log_types = frozenset({'warning'})
     225  
     226      def __init__(self, sock=None, map=None):
     227          if map is None:
     228              self._map = socket_map
     229          else:
     230              self._map = map
     231  
     232          self._fileno = None
     233  
     234          if sock:
     235              # Set to nonblocking just to make sure for cases where we
     236              # get a socket from a blocking source.
     237              sock.setblocking(False)
     238              self.set_socket(sock, map)
     239              self.connected = True
     240              # The constructor no longer requires that the socket
     241              # passed be connected.
     242              try:
     243                  self.addr = sock.getpeername()
     244              except OSError as err:
     245                  if err.errno in (ENOTCONN, EINVAL):
     246                      # To handle the case where we got an unconnected
     247                      # socket.
     248                      self.connected = False
     249                  else:
     250                      # The socket is broken in some unknown way, alert
     251                      # the user and remove it from the map (to prevent
     252                      # polling of broken sockets).
     253                      self.del_channel(map)
     254                      raise
     255          else:
     256              self.socket = None
     257  
     258      def __repr__(self):
     259          status = [self.__class__.__module__+"."+self.__class__.__qualname__]
     260          if self.accepting and self.addr:
     261              status.append('listening')
     262          elif self.connected:
     263              status.append('connected')
     264          if self.addr is not None:
     265              try:
     266                  status.append('%s:%d' % self.addr)
     267              except TypeError:
     268                  status.append(repr(self.addr))
     269          return '<%s at %#x>' % (' '.join(status), id(self))
     270  
     271      def add_channel(self, map=None):
     272          #self.log_info('adding channel %s' % self)
     273          if map is None:
     274              map = self._map
     275          map[self._fileno] = self
     276  
     277      def del_channel(self, map=None):
     278          fd = self._fileno
     279          if map is None:
     280              map = self._map
     281          if fd in map:
     282              #self.log_info('closing channel %d:%s' % (fd, self))
     283              del map[fd]
     284          self._fileno = None
     285  
     286      def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
     287          self.family_and_type = family, type
     288          sock = socket.socket(family, type)
     289          sock.setblocking(False)
     290          self.set_socket(sock)
     291  
     292      def set_socket(self, sock, map=None):
     293          self.socket = sock
     294          self._fileno = sock.fileno()
     295          self.add_channel(map)
     296  
     297      def set_reuse_addr(self):
     298          # try to re-use a server port if possible
     299          try:
     300              self.socket.setsockopt(
     301                  socket.SOL_SOCKET, socket.SO_REUSEADDR,
     302                  self.socket.getsockopt(socket.SOL_SOCKET,
     303                                         socket.SO_REUSEADDR) | 1
     304                  )
     305          except OSError:
     306              pass
     307  
     308      # ==================================================
     309      # predicates for select()
     310      # these are used as filters for the lists of sockets
     311      # to pass to select().
     312      # ==================================================
     313  
     314      def readable(self):
     315          return True
     316  
     317      def writable(self):
     318          return True
     319  
     320      # ==================================================
     321      # socket object methods.
     322      # ==================================================
     323  
     324      def listen(self, num):
     325          self.accepting = True
     326          if os.name == 'nt' and num > 5:
     327              num = 5
     328          return self.socket.listen(num)
     329  
     330      def bind(self, addr):
     331          self.addr = addr
     332          return self.socket.bind(addr)
     333  
     334      def connect(self, address):
     335          self.connected = False
     336          self.connecting = True
     337          err = self.socket.connect_ex(address)
     338          if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
     339          or err == EINVAL and os.name == 'nt':
     340              self.addr = address
     341              return
     342          if err in (0, EISCONN):
     343              self.addr = address
     344              self.handle_connect_event()
     345          else:
     346              raise OSError(err, errorcode[err])
     347  
     348      def accept(self):
     349          # XXX can return either an address pair or None
     350          try:
     351              conn, addr = self.socket.accept()
     352          except TypeError:
     353              return None
     354          except OSError as why:
     355              if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
     356                  return None
     357              else:
     358                  raise
     359          else:
     360              return conn, addr
     361  
     362      def send(self, data):
     363          try:
     364              result = self.socket.send(data)
     365              return result
     366          except OSError as why:
     367              if why.errno == EWOULDBLOCK:
     368                  return 0
     369              elif why.errno in _DISCONNECTED:
     370                  self.handle_close()
     371                  return 0
     372              else:
     373                  raise
     374  
     375      def recv(self, buffer_size):
     376          try:
     377              data = self.socket.recv(buffer_size)
     378              if not data:
     379                  # a closed connection is indicated by signaling
     380                  # a read condition, and having recv() return 0.
     381                  self.handle_close()
     382                  return b''
     383              else:
     384                  return data
     385          except OSError as why:
     386              # winsock sometimes raises ENOTCONN
     387              if why.errno in _DISCONNECTED:
     388                  self.handle_close()
     389                  return b''
     390              else:
     391                  raise
     392  
     393      def close(self):
     394          self.connected = False
     395          self.accepting = False
     396          self.connecting = False
     397          self.del_channel()
     398          if self.socket is not None:
     399              try:
     400                  self.socket.close()
     401              except OSError as why:
     402                  if why.errno not in (ENOTCONN, EBADF):
     403                      raise
     404  
     405      # log and log_info may be overridden to provide more sophisticated
     406      # logging and warning methods. In general, log is for 'hit' logging
     407      # and 'log_info' is for informational, warning and error logging.
     408  
     409      def log(self, message):
     410          sys.stderr.write('log: %s\n' % str(message))
     411  
     412      def log_info(self, message, type='info'):
     413          if type not in self.ignore_log_types:
     414              print('%s: %s' % (type, message))
     415  
     416      def handle_read_event(self):
     417          if self.accepting:
     418              # accepting sockets are never connected, they "spawn" new
     419              # sockets that are connected
     420              self.handle_accept()
     421          elif not self.connected:
     422              if self.connecting:
     423                  self.handle_connect_event()
     424              self.handle_read()
     425          else:
     426              self.handle_read()
     427  
     428      def handle_connect_event(self):
     429          err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     430          if err != 0:
     431              raise OSError(err, _strerror(err))
     432          self.handle_connect()
     433          self.connected = True
     434          self.connecting = False
     435  
     436      def handle_write_event(self):
     437          if self.accepting:
     438              # Accepting sockets shouldn't get a write event.
     439              # We will pretend it didn't happen.
     440              return
     441  
     442          if not self.connected:
     443              if self.connecting:
     444                  self.handle_connect_event()
     445          self.handle_write()
     446  
     447      def handle_expt_event(self):
     448          # handle_expt_event() is called if there might be an error on the
     449          # socket, or if there is OOB data
     450          # check for the error condition first
     451          err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     452          if err != 0:
     453              # we can get here when select.select() says that there is an
     454              # exceptional condition on the socket
     455              # since there is an error, we'll go ahead and close the socket
     456              # like we would in a subclassed handle_read() that received no
     457              # data
     458              self.handle_close()
     459          else:
     460              self.handle_expt()
     461  
     462      def handle_error(self):
     463          nil, t, v, tbinfo = compact_traceback()
     464  
     465          # sometimes a user repr method will crash.
     466          try:
     467              self_repr = repr(self)
     468          except:
     469              self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
     470  
     471          self.log_info(
     472              'uncaptured python exception, closing channel %s (%s:%s %s)' % (
     473                  self_repr,
     474                  t,
     475                  v,
     476                  tbinfo
     477                  ),
     478              'error'
     479              )
     480          self.handle_close()
     481  
     482      def handle_expt(self):
     483          self.log_info('unhandled incoming priority event', 'warning')
     484  
     485      def handle_read(self):
     486          self.log_info('unhandled read event', 'warning')
     487  
     488      def handle_write(self):
     489          self.log_info('unhandled write event', 'warning')
     490  
     491      def handle_connect(self):
     492          self.log_info('unhandled connect event', 'warning')
     493  
     494      def handle_accept(self):
     495          pair = self.accept()
     496          if pair is not None:
     497              self.handle_accepted(*pair)
     498  
     499      def handle_accepted(self, sock, addr):
     500          sock.close()
     501          self.log_info('unhandled accepted event', 'warning')
     502  
     503      def handle_close(self):
     504          self.log_info('unhandled close event', 'warning')
     505          self.close()
     506  
     507  # ---------------------------------------------------------------------------
     508  # adds simple buffered output capability, useful for simple clients.
     509  # [for more sophisticated usage use asynchat.async_chat]
     510  # ---------------------------------------------------------------------------
     511  
     512  class ESC[4;38;5;81mdispatcher_with_send(ESC[4;38;5;149mdispatcher):
     513  
     514      def __init__(self, sock=None, map=None):
     515          dispatcher.__init__(self, sock, map)
     516          self.out_buffer = b''
     517  
     518      def initiate_send(self):
     519          num_sent = 0
     520          num_sent = dispatcher.send(self, self.out_buffer[:65536])
     521          self.out_buffer = self.out_buffer[num_sent:]
     522  
     523      def handle_write(self):
     524          self.initiate_send()
     525  
     526      def writable(self):
     527          return (not self.connected) or len(self.out_buffer)
     528  
     529      def send(self, data):
     530          if self.debug:
     531              self.log_info('sending %s' % repr(data))
     532          self.out_buffer = self.out_buffer + data
     533          self.initiate_send()
     534  
     535  # ---------------------------------------------------------------------------
     536  # used for debugging.
     537  # ---------------------------------------------------------------------------
     538  
     539  def compact_traceback():
     540      exc = sys.exception()
     541      tb = exc.__traceback__
     542      if not tb: # Must have a traceback
     543          raise AssertionError("traceback does not exist")
     544      tbinfo = []
     545      while tb:
     546          tbinfo.append((
     547              tb.tb_frame.f_code.co_filename,
     548              tb.tb_frame.f_code.co_name,
     549              str(tb.tb_lineno)
     550              ))
     551          tb = tb.tb_next
     552  
     553      # just to be safe
     554      del tb
     555  
     556      file, function, line = tbinfo[-1]
     557      info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
     558      return (file, function, line), type(exc), exc, info
     559  
     560  def close_all(map=None, ignore_all=False):
     561      if map is None:
     562          map = socket_map
     563      for x in list(map.values()):
     564          try:
     565              x.close()
     566          except OSError as x:
     567              if x.errno == EBADF:
     568                  pass
     569              elif not ignore_all:
     570                  raise
     571          except _reraised_exceptions:
     572              raise
     573          except:
     574              if not ignore_all:
     575                  raise
     576      map.clear()
     577  
     578  # Asynchronous File I/O:
     579  #
     580  # After a little research (reading man pages on various unixen, and
     581  # digging through the linux kernel), I've determined that select()
     582  # isn't meant for doing asynchronous file i/o.
     583  # Heartening, though - reading linux/mm/filemap.c shows that linux
     584  # supports asynchronous read-ahead.  So _MOST_ of the time, the data
     585  # will be sitting in memory for us already when we go to read it.
     586  #
     587  # What other OS's (besides NT) support async file i/o?  [VMS?]
     588  #
     589  # Regardless, this is useful for pipes, and stdin/stdout...
     590  
     591  if os.name == 'posix':
     592      class ESC[4;38;5;81mfile_wrapper:
     593          # Here we override just enough to make a file
     594          # look like a socket for the purposes of asyncore.
     595          # The passed fd is automatically os.dup()'d
     596  
     597          def __init__(self, fd):
     598              self.fd = os.dup(fd)
     599  
     600          def __del__(self):
     601              if self.fd >= 0:
     602                  warnings.warn("unclosed file %r" % self, ResourceWarning,
     603                                source=self)
     604              self.close()
     605  
     606          def recv(self, *args):
     607              return os.read(self.fd, *args)
     608  
     609          def send(self, *args):
     610              return os.write(self.fd, *args)
     611  
     612          def getsockopt(self, level, optname, buflen=None):
     613              if (level == socket.SOL_SOCKET and
     614                  optname == socket.SO_ERROR and
     615                  not buflen):
     616                  return 0
     617              raise NotImplementedError("Only asyncore specific behaviour "
     618                                        "implemented.")
     619  
     620          read = recv
     621          write = send
     622  
     623          def close(self):
     624              if self.fd < 0:
     625                  return
     626              fd = self.fd
     627              self.fd = -1
     628              os.close(fd)
     629  
     630          def fileno(self):
     631              return self.fd
     632  
     633      class ESC[4;38;5;81mfile_dispatcher(ESC[4;38;5;149mdispatcher):
     634  
     635          def __init__(self, fd, map=None):
     636              dispatcher.__init__(self, None, map)
     637              self.connected = True
     638              try:
     639                  fd = fd.fileno()
     640              except AttributeError:
     641                  pass
     642              self.set_file(fd)
     643              # set it to non-blocking mode
     644              os.set_blocking(fd, False)
     645  
     646          def set_file(self, fd):
     647              self.socket = file_wrapper(fd)
     648              self._fileno = self.socket.fileno()
     649              self.add_channel()