1 # test asynchat
2
3 from test import support
4 from test.support import socket_helper
5 from test.support import threading_helper
6 from test.support import warnings_helper
7
8 import errno
9 import socket
10 import sys
11 import threading
12 import time
13 import unittest
14 import unittest.mock
15
16
17 asynchat = warnings_helper.import_deprecated('asynchat')
18 asyncore = warnings_helper.import_deprecated('asyncore')
19
20 support.requires_working_socket(module=True)
21
22 HOST = socket_helper.HOST
23 SERVER_QUIT = b'QUIT\n'
24
25
26 class ESC[4;38;5;81mecho_server(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
27 # parameter to determine the number of bytes passed back to the
28 # client each send
29 chunk_size = 1
30
31 def __init__(self, event):
32 threading.Thread.__init__(self)
33 self.event = event
34 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35 self.port = socket_helper.bind_port(self.sock)
36 # This will be set if the client wants us to wait before echoing
37 # data back.
38 self.start_resend_event = None
39
40 def run(self):
41 self.sock.listen()
42 self.event.set()
43 conn, client = self.sock.accept()
44 self.buffer = b""
45 # collect data until quit message is seen
46 while SERVER_QUIT not in self.buffer:
47 data = conn.recv(1)
48 if not data:
49 break
50 self.buffer = self.buffer + data
51
52 # remove the SERVER_QUIT message
53 self.buffer = self.buffer.replace(SERVER_QUIT, b'')
54
55 if self.start_resend_event:
56 self.start_resend_event.wait()
57
58 # re-send entire set of collected data
59 try:
60 # this may fail on some tests, such as test_close_when_done,
61 # since the client closes the channel when it's done sending
62 while self.buffer:
63 n = conn.send(self.buffer[:self.chunk_size])
64 time.sleep(0.001)
65 self.buffer = self.buffer[n:]
66 except:
67 pass
68
69 conn.close()
70 self.sock.close()
71
72 class ESC[4;38;5;81mecho_client(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
73
74 def __init__(self, terminator, server_port):
75 asynchat.async_chat.__init__(self)
76 self.contents = []
77 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
78 self.connect((HOST, server_port))
79 self.set_terminator(terminator)
80 self.buffer = b""
81
82 def handle_connect(self):
83 pass
84
85 if sys.platform == 'darwin':
86 # select.poll returns a select.POLLHUP at the end of the tests
87 # on darwin, so just ignore it
88 def handle_expt(self):
89 pass
90
91 def collect_incoming_data(self, data):
92 self.buffer += data
93
94 def found_terminator(self):
95 self.contents.append(self.buffer)
96 self.buffer = b""
97
98 def start_echo_server():
99 event = threading.Event()
100 s = echo_server(event)
101 s.start()
102 event.wait()
103 event.clear()
104 time.sleep(0.01) # Give server time to start accepting.
105 return s, event
106
107
108 class ESC[4;38;5;81mTestAsynchat(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
109 usepoll = False
110
111 def setUp(self):
112 self._threads = threading_helper.threading_setup()
113
114 def tearDown(self):
115 threading_helper.threading_cleanup(*self._threads)
116
117 def line_terminator_check(self, term, server_chunk):
118 event = threading.Event()
119 s = echo_server(event)
120 s.chunk_size = server_chunk
121 s.start()
122 event.wait()
123 event.clear()
124 time.sleep(0.01) # Give server time to start accepting.
125 c = echo_client(term, s.port)
126 c.push(b"hello ")
127 c.push(b"world" + term)
128 c.push(b"I'm not dead yet!" + term)
129 c.push(SERVER_QUIT)
130 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
131 threading_helper.join_thread(s)
132
133 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
134
135 # the line terminator tests below check receiving variously-sized
136 # chunks back from the server in order to exercise all branches of
137 # async_chat.handle_read
138
139 def test_line_terminator1(self):
140 # test one-character terminator
141 for l in (1, 2, 3):
142 self.line_terminator_check(b'\n', l)
143
144 def test_line_terminator2(self):
145 # test two-character terminator
146 for l in (1, 2, 3):
147 self.line_terminator_check(b'\r\n', l)
148
149 def test_line_terminator3(self):
150 # test three-character terminator
151 for l in (1, 2, 3):
152 self.line_terminator_check(b'qqq', l)
153
154 def numeric_terminator_check(self, termlen):
155 # Try reading a fixed number of bytes
156 s, event = start_echo_server()
157 c = echo_client(termlen, s.port)
158 data = b"hello world, I'm not dead yet!\n"
159 c.push(data)
160 c.push(SERVER_QUIT)
161 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
162 threading_helper.join_thread(s)
163
164 self.assertEqual(c.contents, [data[:termlen]])
165
166 def test_numeric_terminator1(self):
167 # check that ints & longs both work (since type is
168 # explicitly checked in async_chat.handle_read)
169 self.numeric_terminator_check(1)
170
171 def test_numeric_terminator2(self):
172 self.numeric_terminator_check(6)
173
174 def test_none_terminator(self):
175 # Try reading a fixed number of bytes
176 s, event = start_echo_server()
177 c = echo_client(None, s.port)
178 data = b"hello world, I'm not dead yet!\n"
179 c.push(data)
180 c.push(SERVER_QUIT)
181 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
182 threading_helper.join_thread(s)
183
184 self.assertEqual(c.contents, [])
185 self.assertEqual(c.buffer, data)
186
187 def test_simple_producer(self):
188 s, event = start_echo_server()
189 c = echo_client(b'\n', s.port)
190 data = b"hello world\nI'm not dead yet!\n"
191 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
192 c.push_with_producer(p)
193 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
194 threading_helper.join_thread(s)
195
196 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
197
198 def test_string_producer(self):
199 s, event = start_echo_server()
200 c = echo_client(b'\n', s.port)
201 data = b"hello world\nI'm not dead yet!\n"
202 c.push_with_producer(data+SERVER_QUIT)
203 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
204 threading_helper.join_thread(s)
205
206 self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
207
208 def test_empty_line(self):
209 # checks that empty lines are handled correctly
210 s, event = start_echo_server()
211 c = echo_client(b'\n', s.port)
212 c.push(b"hello world\n\nI'm not dead yet!\n")
213 c.push(SERVER_QUIT)
214 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
215 threading_helper.join_thread(s)
216
217 self.assertEqual(c.contents,
218 [b"hello world", b"", b"I'm not dead yet!"])
219
220 def test_close_when_done(self):
221 s, event = start_echo_server()
222 s.start_resend_event = threading.Event()
223 c = echo_client(b'\n', s.port)
224 c.push(b"hello world\nI'm not dead yet!\n")
225 c.push(SERVER_QUIT)
226 c.close_when_done()
227 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
228
229 # Only allow the server to start echoing data back to the client after
230 # the client has closed its connection. This prevents a race condition
231 # where the server echoes all of its data before we can check that it
232 # got any down below.
233 s.start_resend_event.set()
234 threading_helper.join_thread(s)
235
236 self.assertEqual(c.contents, [])
237 # the server might have been able to send a byte or two back, but this
238 # at least checks that it received something and didn't just fail
239 # (which could still result in the client not having received anything)
240 self.assertGreater(len(s.buffer), 0)
241
242 def test_push(self):
243 # Issue #12523: push() should raise a TypeError if it doesn't get
244 # a bytes string
245 s, event = start_echo_server()
246 c = echo_client(b'\n', s.port)
247 data = b'bytes\n'
248 c.push(data)
249 c.push(bytearray(data))
250 c.push(memoryview(data))
251 self.assertRaises(TypeError, c.push, 10)
252 self.assertRaises(TypeError, c.push, 'unicode')
253 c.push(SERVER_QUIT)
254 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
255 threading_helper.join_thread(s)
256 self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
257
258
259 class ESC[4;38;5;81mTestAsynchat_WithPoll(ESC[4;38;5;149mTestAsynchat):
260 usepoll = True
261
262
263 class ESC[4;38;5;81mTestAsynchatMocked(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
264 def test_blockingioerror(self):
265 # Issue #16133: handle_read() must ignore BlockingIOError
266 sock = unittest.mock.Mock()
267 sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
268
269 dispatcher = asynchat.async_chat()
270 dispatcher.set_socket(sock)
271 self.addCleanup(dispatcher.del_channel)
272
273 with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
274 dispatcher.handle_read()
275 self.assertFalse(error.called)
276
277
278 class ESC[4;38;5;81mTestHelperFunctions(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
279 def test_find_prefix_at_end(self):
280 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
281 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
282
283
284 class ESC[4;38;5;81mTestNotConnected(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
285 def test_disallow_negative_terminator(self):
286 # Issue #11259
287 client = asynchat.async_chat()
288 self.assertRaises(ValueError, client.set_terminator, -1)
289
290
291
292 if __name__ == "__main__":
293 unittest.main()