(root)/
Python-3.11.7/
Lib/
test/
test_asynchat.py
       1  # test asynchat
       2  
       3  from test import support
       4  from test.support import socket_helper
       5  from test.support import threading_helper
       6  from test.support import warnings_helper
       7  
       8  import errno
       9  import socket
      10  import sys
      11  import threading
      12  import time
      13  import unittest
      14  import unittest.mock
      15  
      16  
      17  asynchat = warnings_helper.import_deprecated('asynchat')
      18  asyncore = warnings_helper.import_deprecated('asyncore')
      19  
      20  support.requires_working_socket(module=True)
      21  
      22  HOST = socket_helper.HOST
      23  SERVER_QUIT = b'QUIT\n'
      24  
      25  
      26  class ESC[4;38;5;81mecho_server(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      27      # parameter to determine the number of bytes passed back to the
      28      # client each send
      29      chunk_size = 1
      30  
      31      def __init__(self, event):
      32          threading.Thread.__init__(self)
      33          self.event = event
      34          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      35          self.port = socket_helper.bind_port(self.sock)
      36          # This will be set if the client wants us to wait before echoing
      37          # data back.
      38          self.start_resend_event = None
      39  
      40      def run(self):
      41          self.sock.listen()
      42          self.event.set()
      43          conn, client = self.sock.accept()
      44          self.buffer = b""
      45          # collect data until quit message is seen
      46          while SERVER_QUIT not in self.buffer:
      47              data = conn.recv(1)
      48              if not data:
      49                  break
      50              self.buffer = self.buffer + data
      51  
      52          # remove the SERVER_QUIT message
      53          self.buffer = self.buffer.replace(SERVER_QUIT, b'')
      54  
      55          if self.start_resend_event:
      56              self.start_resend_event.wait()
      57  
      58          # re-send entire set of collected data
      59          try:
      60              # this may fail on some tests, such as test_close_when_done,
      61              # since the client closes the channel when it's done sending
      62              while self.buffer:
      63                  n = conn.send(self.buffer[:self.chunk_size])
      64                  time.sleep(0.001)
      65                  self.buffer = self.buffer[n:]
      66          except:
      67              pass
      68  
      69          conn.close()
      70          self.sock.close()
      71  
      72  class ESC[4;38;5;81mecho_client(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      73  
      74      def __init__(self, terminator, server_port):
      75          asynchat.async_chat.__init__(self)
      76          self.contents = []
      77          self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
      78          self.connect((HOST, server_port))
      79          self.set_terminator(terminator)
      80          self.buffer = b""
      81  
      82      def handle_connect(self):
      83          pass
      84  
      85      if sys.platform == 'darwin':
      86          # select.poll returns a select.POLLHUP at the end of the tests
      87          # on darwin, so just ignore it
      88          def handle_expt(self):
      89              pass
      90  
      91      def collect_incoming_data(self, data):
      92          self.buffer += data
      93  
      94      def found_terminator(self):
      95          self.contents.append(self.buffer)
      96          self.buffer = b""
      97  
      98  def start_echo_server():
      99      event = threading.Event()
     100      s = echo_server(event)
     101      s.start()
     102      event.wait()
     103      event.clear()
     104      time.sleep(0.01)   # Give server time to start accepting.
     105      return s, event
     106  
     107  
     108  class ESC[4;38;5;81mTestAsynchat(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     109      usepoll = False
     110  
     111      def setUp(self):
     112          self._threads = threading_helper.threading_setup()
     113  
     114      def tearDown(self):
     115          threading_helper.threading_cleanup(*self._threads)
     116  
     117      def line_terminator_check(self, term, server_chunk):
     118          event = threading.Event()
     119          s = echo_server(event)
     120          s.chunk_size = server_chunk
     121          s.start()
     122          event.wait()
     123          event.clear()
     124          time.sleep(0.01)   # Give server time to start accepting.
     125          c = echo_client(term, s.port)
     126          c.push(b"hello ")
     127          c.push(b"world" + term)
     128          c.push(b"I'm not dead yet!" + term)
     129          c.push(SERVER_QUIT)
     130          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     131          threading_helper.join_thread(s)
     132  
     133          self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
     134  
     135      # the line terminator tests below check receiving variously-sized
     136      # chunks back from the server in order to exercise all branches of
     137      # async_chat.handle_read
     138  
     139      def test_line_terminator1(self):
     140          # test one-character terminator
     141          for l in (1, 2, 3):
     142              self.line_terminator_check(b'\n', l)
     143  
     144      def test_line_terminator2(self):
     145          # test two-character terminator
     146          for l in (1, 2, 3):
     147              self.line_terminator_check(b'\r\n', l)
     148  
     149      def test_line_terminator3(self):
     150          # test three-character terminator
     151          for l in (1, 2, 3):
     152              self.line_terminator_check(b'qqq', l)
     153  
     154      def numeric_terminator_check(self, termlen):
     155          # Try reading a fixed number of bytes
     156          s, event = start_echo_server()
     157          c = echo_client(termlen, s.port)
     158          data = b"hello world, I'm not dead yet!\n"
     159          c.push(data)
     160          c.push(SERVER_QUIT)
     161          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     162          threading_helper.join_thread(s)
     163  
     164          self.assertEqual(c.contents, [data[:termlen]])
     165  
     166      def test_numeric_terminator1(self):
     167          # check that ints & longs both work (since type is
     168          # explicitly checked in async_chat.handle_read)
     169          self.numeric_terminator_check(1)
     170  
     171      def test_numeric_terminator2(self):
     172          self.numeric_terminator_check(6)
     173  
     174      def test_none_terminator(self):
     175          # Try reading a fixed number of bytes
     176          s, event = start_echo_server()
     177          c = echo_client(None, s.port)
     178          data = b"hello world, I'm not dead yet!\n"
     179          c.push(data)
     180          c.push(SERVER_QUIT)
     181          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     182          threading_helper.join_thread(s)
     183  
     184          self.assertEqual(c.contents, [])
     185          self.assertEqual(c.buffer, data)
     186  
     187      def test_simple_producer(self):
     188          s, event = start_echo_server()
     189          c = echo_client(b'\n', s.port)
     190          data = b"hello world\nI'm not dead yet!\n"
     191          p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
     192          c.push_with_producer(p)
     193          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     194          threading_helper.join_thread(s)
     195  
     196          self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
     197  
     198      def test_string_producer(self):
     199          s, event = start_echo_server()
     200          c = echo_client(b'\n', s.port)
     201          data = b"hello world\nI'm not dead yet!\n"
     202          c.push_with_producer(data+SERVER_QUIT)
     203          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     204          threading_helper.join_thread(s)
     205  
     206          self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
     207  
     208      def test_empty_line(self):
     209          # checks that empty lines are handled correctly
     210          s, event = start_echo_server()
     211          c = echo_client(b'\n', s.port)
     212          c.push(b"hello world\n\nI'm not dead yet!\n")
     213          c.push(SERVER_QUIT)
     214          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     215          threading_helper.join_thread(s)
     216  
     217          self.assertEqual(c.contents,
     218                           [b"hello world", b"", b"I'm not dead yet!"])
     219  
     220      def test_close_when_done(self):
     221          s, event = start_echo_server()
     222          s.start_resend_event = threading.Event()
     223          c = echo_client(b'\n', s.port)
     224          c.push(b"hello world\nI'm not dead yet!\n")
     225          c.push(SERVER_QUIT)
     226          c.close_when_done()
     227          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     228  
     229          # Only allow the server to start echoing data back to the client after
     230          # the client has closed its connection.  This prevents a race condition
     231          # where the server echoes all of its data before we can check that it
     232          # got any down below.
     233          s.start_resend_event.set()
     234          threading_helper.join_thread(s)
     235  
     236          self.assertEqual(c.contents, [])
     237          # the server might have been able to send a byte or two back, but this
     238          # at least checks that it received something and didn't just fail
     239          # (which could still result in the client not having received anything)
     240          self.assertGreater(len(s.buffer), 0)
     241  
     242      def test_push(self):
     243          # Issue #12523: push() should raise a TypeError if it doesn't get
     244          # a bytes string
     245          s, event = start_echo_server()
     246          c = echo_client(b'\n', s.port)
     247          data = b'bytes\n'
     248          c.push(data)
     249          c.push(bytearray(data))
     250          c.push(memoryview(data))
     251          self.assertRaises(TypeError, c.push, 10)
     252          self.assertRaises(TypeError, c.push, 'unicode')
     253          c.push(SERVER_QUIT)
     254          asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
     255          threading_helper.join_thread(s)
     256          self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
     257  
     258  
     259  class ESC[4;38;5;81mTestAsynchat_WithPoll(ESC[4;38;5;149mTestAsynchat):
     260      usepoll = True
     261  
     262  
     263  class ESC[4;38;5;81mTestAsynchatMocked(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     264      def test_blockingioerror(self):
     265          # Issue #16133: handle_read() must ignore BlockingIOError
     266          sock = unittest.mock.Mock()
     267          sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
     268  
     269          dispatcher = asynchat.async_chat()
     270          dispatcher.set_socket(sock)
     271          self.addCleanup(dispatcher.del_channel)
     272  
     273          with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
     274              dispatcher.handle_read()
     275          self.assertFalse(error.called)
     276  
     277  
     278  class ESC[4;38;5;81mTestHelperFunctions(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     279      def test_find_prefix_at_end(self):
     280          self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
     281          self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
     282  
     283  
     284  class ESC[4;38;5;81mTestNotConnected(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     285      def test_disallow_negative_terminator(self):
     286          # Issue #11259
     287          client = asynchat.async_chat()
     288          self.assertRaises(ValueError, client.set_terminator, -1)
     289  
     290  
     291  
     292  if __name__ == "__main__":
     293      unittest.main()