(root)/
Python-3.11.7/
Lib/
asyncio/
trsock.py
       1  import socket
       2  
       3  
       4  class ESC[4;38;5;81mTransportSocket:
       5  
       6      """A socket-like wrapper for exposing real transport sockets.
       7  
       8      These objects can be safely returned by APIs like
       9      `transport.get_extra_info('socket')`.  All potentially disruptive
      10      operations (like "socket.close()") are banned.
      11      """
      12  
      13      __slots__ = ('_sock',)
      14  
      15      def __init__(self, sock: socket.socket):
      16          self._sock = sock
      17  
      18      @property
      19      def family(self):
      20          return self._sock.family
      21  
      22      @property
      23      def type(self):
      24          return self._sock.type
      25  
      26      @property
      27      def proto(self):
      28          return self._sock.proto
      29  
      30      def __repr__(self):
      31          s = (
      32              f"<asyncio.TransportSocket fd={self.fileno()}, "
      33              f"family={self.family!s}, type={self.type!s}, "
      34              f"proto={self.proto}"
      35          )
      36  
      37          if self.fileno() != -1:
      38              try:
      39                  laddr = self.getsockname()
      40                  if laddr:
      41                      s = f"{s}, laddr={laddr}"
      42              except socket.error:
      43                  pass
      44              try:
      45                  raddr = self.getpeername()
      46                  if raddr:
      47                      s = f"{s}, raddr={raddr}"
      48              except socket.error:
      49                  pass
      50  
      51          return f"{s}>"
      52  
      53      def __getstate__(self):
      54          raise TypeError("Cannot serialize asyncio.TransportSocket object")
      55  
      56      def fileno(self):
      57          return self._sock.fileno()
      58  
      59      def dup(self):
      60          return self._sock.dup()
      61  
      62      def get_inheritable(self):
      63          return self._sock.get_inheritable()
      64  
      65      def shutdown(self, how):
      66          # asyncio doesn't currently provide a high-level transport API
      67          # to shutdown the connection.
      68          self._sock.shutdown(how)
      69  
      70      def getsockopt(self, *args, **kwargs):
      71          return self._sock.getsockopt(*args, **kwargs)
      72  
      73      def setsockopt(self, *args, **kwargs):
      74          self._sock.setsockopt(*args, **kwargs)
      75  
      76      def getpeername(self):
      77          return self._sock.getpeername()
      78  
      79      def getsockname(self):
      80          return self._sock.getsockname()
      81  
      82      def getsockbyname(self):
      83          return self._sock.getsockbyname()
      84  
      85      def settimeout(self, value):
      86          if value == 0:
      87              return
      88          raise ValueError(
      89              'settimeout(): only 0 timeout is allowed on transport sockets')
      90  
      91      def gettimeout(self):
      92          return 0
      93  
      94      def setblocking(self, flag):
      95          if not flag:
      96              return
      97          raise ValueError(
      98              'setblocking(): transport sockets cannot be blocking')