(root)/
Python-3.11.7/
Lib/
asyncore.py
       1  # -*- Mode: Python -*-
       2  #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
       3  #   Author: Sam Rushing <rushing@nightmare.com>
       4  
       5  # ======================================================================
       6  # Copyright 1996 by Sam Rushing
       7  #
       8  #                         All Rights Reserved
       9  #
      10  # Permission to use, copy, modify, and distribute this software and
      11  # its documentation for any purpose and without fee is hereby
      12  # granted, provided that the above copyright notice appear in all
      13  # copies and that both that copyright notice and this permission
      14  # notice appear in supporting documentation, and that the name of Sam
      15  # Rushing not be used in advertising or publicity pertaining to
      16  # distribution of the software without specific, written prior
      17  # permission.
      18  #
      19  # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
      20  # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
      21  # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
      22  # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
      23  # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
      24  # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
      25  # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
      26  # ======================================================================
      27  
      28  """Basic infrastructure for asynchronous socket service clients and servers.
      29  
      30  There are only two ways to have a program on a single processor do "more
      31  than one thing at a time".  Multi-threaded programming is the simplest and
      32  most popular way to do it, but there is another very different technique,
      33  that lets you have nearly all the advantages of multi-threading, without
      34  actually using multiple threads. it's really only practical if your program
      35  is largely I/O bound. If your program is CPU bound, then pre-emptive
      36  scheduled threads are probably what you really need. Network servers are
      37  rarely CPU-bound, however.
      38  
      39  If your operating system supports the select() system call in its I/O
      40  library (and nearly all do), then you can use it to juggle multiple
      41  communication channels at once; doing other work while your I/O is taking
      42  place in the "background."  Although this strategy can seem strange and
      43  complex, especially at first, it is in many ways easier to understand and
      44  control than multi-threaded programming. The module documented here solves
      45  many of the difficult problems for you, making the task of building
      46  sophisticated high-performance network servers and clients a snap.
      47  """
      48  
      49  import select
      50  import socket
      51  import sys
      52  import time
      53  import warnings
      54  
      55  import os
      56  from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
      57       ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
      58       errorcode
      59  
      60  _DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in '
      61                      'Python {remove}. The recommended replacement is asyncio')
      62  warnings._deprecated(__name__, _DEPRECATION_MSG, remove=(3, 12))
      63  
      64  
      65  _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
      66                             EBADF})
      67  
      68  try:
      69      socket_map
      70  except NameError:
      71      socket_map = {}
      72  
      73  def _strerror(err):
      74      try:
      75          return os.strerror(err)
      76      except (ValueError, OverflowError, NameError):
      77          if err in errorcode:
      78              return errorcode[err]
      79          return "Unknown error %s" %err
      80  
      81  class ESC[4;38;5;81mExitNow(ESC[4;38;5;149mException):
      82      pass
      83  
      84  _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
      85  
      86  def read(obj):
      87      try:
      88          obj.handle_read_event()
      89      except _reraised_exceptions:
      90          raise
      91      except:
      92          obj.handle_error()
      93  
      94  def write(obj):
      95      try:
      96          obj.handle_write_event()
      97      except _reraised_exceptions:
      98          raise
      99      except:
     100          obj.handle_error()
     101  
     102  def _exception(obj):
     103      try:
     104          obj.handle_expt_event()
     105      except _reraised_exceptions:
     106          raise
     107      except:
     108          obj.handle_error()
     109  
     110  def readwrite(obj, flags):
     111      try:
     112          if flags & select.POLLIN:
     113              obj.handle_read_event()
     114          if flags & select.POLLOUT:
     115              obj.handle_write_event()
     116          if flags & select.POLLPRI:
     117              obj.handle_expt_event()
     118          if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
     119              obj.handle_close()
     120      except OSError as e:
     121          if e.errno not in _DISCONNECTED:
     122              obj.handle_error()
     123          else:
     124              obj.handle_close()
     125      except _reraised_exceptions:
     126          raise
     127      except:
     128          obj.handle_error()
     129  
     130  def poll(timeout=0.0, map=None):
     131      if map is None:
     132          map = socket_map
     133      if map:
     134          r = []; w = []; e = []
     135          for fd, obj in list(map.items()):
     136              is_r = obj.readable()
     137              is_w = obj.writable()
     138              if is_r:
     139                  r.append(fd)
     140              # accepting sockets should not be writable
     141              if is_w and not obj.accepting:
     142                  w.append(fd)
     143              if is_r or is_w:
     144                  e.append(fd)
     145          if [] == r == w == e:
     146              time.sleep(timeout)
     147              return
     148  
     149          r, w, e = select.select(r, w, e, timeout)
     150  
     151          for fd in r:
     152              obj = map.get(fd)
     153              if obj is None:
     154                  continue
     155              read(obj)
     156  
     157          for fd in w:
     158              obj = map.get(fd)
     159              if obj is None:
     160                  continue
     161              write(obj)
     162  
     163          for fd in e:
     164              obj = map.get(fd)
     165              if obj is None:
     166                  continue
     167              _exception(obj)
     168  
     169  def poll2(timeout=0.0, map=None):
     170      # Use the poll() support added to the select module in Python 2.0
     171      if map is None:
     172          map = socket_map
     173      if timeout is not None:
     174          # timeout is in milliseconds
     175          timeout = int(timeout*1000)
     176      pollster = select.poll()
     177      if map:
     178          for fd, obj in list(map.items()):
     179              flags = 0
     180              if obj.readable():
     181                  flags |= select.POLLIN | select.POLLPRI
     182              # accepting sockets should not be writable
     183              if obj.writable() and not obj.accepting:
     184                  flags |= select.POLLOUT
     185              if flags:
     186                  pollster.register(fd, flags)
     187  
     188          r = pollster.poll(timeout)
     189          for fd, flags in r:
     190              obj = map.get(fd)
     191              if obj is None:
     192                  continue
     193              readwrite(obj, flags)
     194  
     195  poll3 = poll2                           # Alias for backward compatibility
     196  
     197  def loop(timeout=30.0, use_poll=False, map=None, count=None):
     198      if map is None:
     199          map = socket_map
     200  
     201      if use_poll and hasattr(select, 'poll'):
     202          poll_fun = poll2
     203      else:
     204          poll_fun = poll
     205  
     206      if count is None:
     207          while map:
     208              poll_fun(timeout, map)
     209  
     210      else:
     211          while map and count > 0:
     212              poll_fun(timeout, map)
     213              count = count - 1
     214  
     215  class ESC[4;38;5;81mdispatcher:
     216  
     217      debug = False
     218      connected = False
     219      accepting = False
     220      connecting = False
     221      closing = False
     222      addr = None
     223      ignore_log_types = frozenset({'warning'})
     224  
     225      def __init__(self, sock=None, map=None):
     226          if map is None:
     227              self._map = socket_map
     228          else:
     229              self._map = map
     230  
     231          self._fileno = None
     232  
     233          if sock:
     234              # Set to nonblocking just to make sure for cases where we
     235              # get a socket from a blocking source.
     236              sock.setblocking(False)
     237              self.set_socket(sock, map)
     238              self.connected = True
     239              # The constructor no longer requires that the socket
     240              # passed be connected.
     241              try:
     242                  self.addr = sock.getpeername()
     243              except OSError as err:
     244                  if err.errno in (ENOTCONN, EINVAL):
     245                      # To handle the case where we got an unconnected
     246                      # socket.
     247                      self.connected = False
     248                  else:
     249                      # The socket is broken in some unknown way, alert
     250                      # the user and remove it from the map (to prevent
     251                      # polling of broken sockets).
     252                      self.del_channel(map)
     253                      raise
     254          else:
     255              self.socket = None
     256  
     257      def __repr__(self):
     258          status = [self.__class__.__module__+"."+self.__class__.__qualname__]
     259          if self.accepting and self.addr:
     260              status.append('listening')
     261          elif self.connected:
     262              status.append('connected')
     263          if self.addr is not None:
     264              try:
     265                  status.append('%s:%d' % self.addr)
     266              except TypeError:
     267                  status.append(repr(self.addr))
     268          return '<%s at %#x>' % (' '.join(status), id(self))
     269  
     270      def add_channel(self, map=None):
     271          #self.log_info('adding channel %s' % self)
     272          if map is None:
     273              map = self._map
     274          map[self._fileno] = self
     275  
     276      def del_channel(self, map=None):
     277          fd = self._fileno
     278          if map is None:
     279              map = self._map
     280          if fd in map:
     281              #self.log_info('closing channel %d:%s' % (fd, self))
     282              del map[fd]
     283          self._fileno = None
     284  
     285      def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
     286          self.family_and_type = family, type
     287          sock = socket.socket(family, type)
     288          sock.setblocking(False)
     289          self.set_socket(sock)
     290  
     291      def set_socket(self, sock, map=None):
     292          self.socket = sock
     293          self._fileno = sock.fileno()
     294          self.add_channel(map)
     295  
     296      def set_reuse_addr(self):
     297          # try to re-use a server port if possible
     298          try:
     299              self.socket.setsockopt(
     300                  socket.SOL_SOCKET, socket.SO_REUSEADDR,
     301                  self.socket.getsockopt(socket.SOL_SOCKET,
     302                                         socket.SO_REUSEADDR) | 1
     303                  )
     304          except OSError:
     305              pass
     306  
     307      # ==================================================
     308      # predicates for select()
     309      # these are used as filters for the lists of sockets
     310      # to pass to select().
     311      # ==================================================
     312  
     313      def readable(self):
     314          return True
     315  
     316      def writable(self):
     317          return True
     318  
     319      # ==================================================
     320      # socket object methods.
     321      # ==================================================
     322  
     323      def listen(self, num):
     324          self.accepting = True
     325          if os.name == 'nt' and num > 5:
     326              num = 5
     327          return self.socket.listen(num)
     328  
     329      def bind(self, addr):
     330          self.addr = addr
     331          return self.socket.bind(addr)
     332  
     333      def connect(self, address):
     334          self.connected = False
     335          self.connecting = True
     336          err = self.socket.connect_ex(address)
     337          if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
     338          or err == EINVAL and os.name == 'nt':
     339              self.addr = address
     340              return
     341          if err in (0, EISCONN):
     342              self.addr = address
     343              self.handle_connect_event()
     344          else:
     345              raise OSError(err, errorcode[err])
     346  
     347      def accept(self):
     348          # XXX can return either an address pair or None
     349          try:
     350              conn, addr = self.socket.accept()
     351          except TypeError:
     352              return None
     353          except OSError as why:
     354              if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
     355                  return None
     356              else:
     357                  raise
     358          else:
     359              return conn, addr
     360  
     361      def send(self, data):
     362          try:
     363              result = self.socket.send(data)
     364              return result
     365          except OSError as why:
     366              if why.errno == EWOULDBLOCK:
     367                  return 0
     368              elif why.errno in _DISCONNECTED:
     369                  self.handle_close()
     370                  return 0
     371              else:
     372                  raise
     373  
     374      def recv(self, buffer_size):
     375          try:
     376              data = self.socket.recv(buffer_size)
     377              if not data:
     378                  # a closed connection is indicated by signaling
     379                  # a read condition, and having recv() return 0.
     380                  self.handle_close()
     381                  return b''
     382              else:
     383                  return data
     384          except OSError as why:
     385              # winsock sometimes raises ENOTCONN
     386              if why.errno in _DISCONNECTED:
     387                  self.handle_close()
     388                  return b''
     389              else:
     390                  raise
     391  
     392      def close(self):
     393          self.connected = False
     394          self.accepting = False
     395          self.connecting = False
     396          self.del_channel()
     397          if self.socket is not None:
     398              try:
     399                  self.socket.close()
     400              except OSError as why:
     401                  if why.errno not in (ENOTCONN, EBADF):
     402                      raise
     403  
     404      # log and log_info may be overridden to provide more sophisticated
     405      # logging and warning methods. In general, log is for 'hit' logging
     406      # and 'log_info' is for informational, warning and error logging.
     407  
     408      def log(self, message):
     409          sys.stderr.write('log: %s\n' % str(message))
     410  
     411      def log_info(self, message, type='info'):
     412          if type not in self.ignore_log_types:
     413              print('%s: %s' % (type, message))
     414  
     415      def handle_read_event(self):
     416          if self.accepting:
     417              # accepting sockets are never connected, they "spawn" new
     418              # sockets that are connected
     419              self.handle_accept()
     420          elif not self.connected:
     421              if self.connecting:
     422                  self.handle_connect_event()
     423              self.handle_read()
     424          else:
     425              self.handle_read()
     426  
     427      def handle_connect_event(self):
     428          err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     429          if err != 0:
     430              raise OSError(err, _strerror(err))
     431          self.handle_connect()
     432          self.connected = True
     433          self.connecting = False
     434  
     435      def handle_write_event(self):
     436          if self.accepting:
     437              # Accepting sockets shouldn't get a write event.
     438              # We will pretend it didn't happen.
     439              return
     440  
     441          if not self.connected:
     442              if self.connecting:
     443                  self.handle_connect_event()
     444          self.handle_write()
     445  
     446      def handle_expt_event(self):
     447          # handle_expt_event() is called if there might be an error on the
     448          # socket, or if there is OOB data
     449          # check for the error condition first
     450          err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
     451          if err != 0:
     452              # we can get here when select.select() says that there is an
     453              # exceptional condition on the socket
     454              # since there is an error, we'll go ahead and close the socket
     455              # like we would in a subclassed handle_read() that received no
     456              # data
     457              self.handle_close()
     458          else:
     459              self.handle_expt()
     460  
     461      def handle_error(self):
     462          nil, t, v, tbinfo = compact_traceback()
     463  
     464          # sometimes a user repr method will crash.
     465          try:
     466              self_repr = repr(self)
     467          except:
     468              self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
     469  
     470          self.log_info(
     471              'uncaptured python exception, closing channel %s (%s:%s %s)' % (
     472                  self_repr,
     473                  t,
     474                  v,
     475                  tbinfo
     476                  ),
     477              'error'
     478              )
     479          self.handle_close()
     480  
     481      def handle_expt(self):
     482          self.log_info('unhandled incoming priority event', 'warning')
     483  
     484      def handle_read(self):
     485          self.log_info('unhandled read event', 'warning')
     486  
     487      def handle_write(self):
     488          self.log_info('unhandled write event', 'warning')
     489  
     490      def handle_connect(self):
     491          self.log_info('unhandled connect event', 'warning')
     492  
     493      def handle_accept(self):
     494          pair = self.accept()
     495          if pair is not None:
     496              self.handle_accepted(*pair)
     497  
     498      def handle_accepted(self, sock, addr):
     499          sock.close()
     500          self.log_info('unhandled accepted event', 'warning')
     501  
     502      def handle_close(self):
     503          self.log_info('unhandled close event', 'warning')
     504          self.close()
     505  
     506  # ---------------------------------------------------------------------------
     507  # adds simple buffered output capability, useful for simple clients.
     508  # [for more sophisticated usage use asynchat.async_chat]
     509  # ---------------------------------------------------------------------------
     510  
     511  class ESC[4;38;5;81mdispatcher_with_send(ESC[4;38;5;149mdispatcher):
     512  
     513      def __init__(self, sock=None, map=None):
     514          dispatcher.__init__(self, sock, map)
     515          self.out_buffer = b''
     516  
     517      def initiate_send(self):
     518          num_sent = 0
     519          num_sent = dispatcher.send(self, self.out_buffer[:65536])
     520          self.out_buffer = self.out_buffer[num_sent:]
     521  
     522      def handle_write(self):
     523          self.initiate_send()
     524  
     525      def writable(self):
     526          return (not self.connected) or len(self.out_buffer)
     527  
     528      def send(self, data):
     529          if self.debug:
     530              self.log_info('sending %s' % repr(data))
     531          self.out_buffer = self.out_buffer + data
     532          self.initiate_send()
     533  
     534  # ---------------------------------------------------------------------------
     535  # used for debugging.
     536  # ---------------------------------------------------------------------------
     537  
     538  def compact_traceback():
     539      t, v, tb = sys.exc_info()
     540      tbinfo = []
     541      if not tb: # Must have a traceback
     542          raise AssertionError("traceback does not exist")
     543      while tb:
     544          tbinfo.append((
     545              tb.tb_frame.f_code.co_filename,
     546              tb.tb_frame.f_code.co_name,
     547              str(tb.tb_lineno)
     548              ))
     549          tb = tb.tb_next
     550  
     551      # just to be safe
     552      del tb
     553  
     554      file, function, line = tbinfo[-1]
     555      info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
     556      return (file, function, line), t, v, info
     557  
     558  def close_all(map=None, ignore_all=False):
     559      if map is None:
     560          map = socket_map
     561      for x in list(map.values()):
     562          try:
     563              x.close()
     564          except OSError as x:
     565              if x.errno == EBADF:
     566                  pass
     567              elif not ignore_all:
     568                  raise
     569          except _reraised_exceptions:
     570              raise
     571          except:
     572              if not ignore_all:
     573                  raise
     574      map.clear()
     575  
     576  # Asynchronous File I/O:
     577  #
     578  # After a little research (reading man pages on various unixen, and
     579  # digging through the linux kernel), I've determined that select()
     580  # isn't meant for doing asynchronous file i/o.
     581  # Heartening, though - reading linux/mm/filemap.c shows that linux
     582  # supports asynchronous read-ahead.  So _MOST_ of the time, the data
     583  # will be sitting in memory for us already when we go to read it.
     584  #
     585  # What other OS's (besides NT) support async file i/o?  [VMS?]
     586  #
     587  # Regardless, this is useful for pipes, and stdin/stdout...
     588  
     589  if os.name == 'posix':
     590      class ESC[4;38;5;81mfile_wrapper:
     591          # Here we override just enough to make a file
     592          # look like a socket for the purposes of asyncore.
     593          # The passed fd is automatically os.dup()'d
     594  
     595          def __init__(self, fd):
     596              self.fd = os.dup(fd)
     597  
     598          def __del__(self):
     599              if self.fd >= 0:
     600                  warnings.warn("unclosed file %r" % self, ResourceWarning,
     601                                source=self)
     602              self.close()
     603  
     604          def recv(self, *args):
     605              return os.read(self.fd, *args)
     606  
     607          def send(self, *args):
     608              return os.write(self.fd, *args)
     609  
     610          def getsockopt(self, level, optname, buflen=None):
     611              if (level == socket.SOL_SOCKET and
     612                  optname == socket.SO_ERROR and
     613                  not buflen):
     614                  return 0
     615              raise NotImplementedError("Only asyncore specific behaviour "
     616                                        "implemented.")
     617  
     618          read = recv
     619          write = send
     620  
     621          def close(self):
     622              if self.fd < 0:
     623                  return
     624              fd = self.fd
     625              self.fd = -1
     626              os.close(fd)
     627  
     628          def fileno(self):
     629              return self.fd
     630  
     631      class ESC[4;38;5;81mfile_dispatcher(ESC[4;38;5;149mdispatcher):
     632  
     633          def __init__(self, fd, map=None):
     634              dispatcher.__init__(self, None, map)
     635              self.connected = True
     636              try:
     637                  fd = fd.fileno()
     638              except AttributeError:
     639                  pass
     640              self.set_file(fd)
     641              # set it to non-blocking mode
     642              os.set_blocking(fd, False)
     643  
     644          def set_file(self, fd):
     645              self.socket = file_wrapper(fd)
     646              self._fileno = self.socket.fileno()
     647              self.add_channel()