1 """Tests for streams.py."""
2
3 import gc
4 import os
5 import queue
6 import pickle
7 import socket
8 import sys
9 import threading
10 import unittest
11 from unittest import mock
12 from test.support import socket_helper
13 try:
14 import ssl
15 except ImportError:
16 ssl = None
17
18 import asyncio
19 from test.test_asyncio import utils as test_utils
20
21
22 def tearDownModule():
23 asyncio.set_event_loop_policy(None)
24
25
26 class ESC[4;38;5;81mStreamTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
27
28 DATA = b'line1\nline2\nline3\n'
29
30 def setUp(self):
31 super().setUp()
32 self.loop = asyncio.new_event_loop()
33 self.set_event_loop(self.loop)
34
35 def tearDown(self):
36 # just in case if we have transport close callbacks
37 test_utils.run_briefly(self.loop)
38
39 # set_event_loop() takes care of closing self.loop in a safe way
40 super().tearDown()
41
42 def _basetest_open_connection(self, open_connection_fut):
43 messages = []
44 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
45 reader, writer = self.loop.run_until_complete(open_connection_fut)
46 writer.write(b'GET / HTTP/1.0\r\n\r\n')
47 f = reader.readline()
48 data = self.loop.run_until_complete(f)
49 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
50 f = reader.read()
51 data = self.loop.run_until_complete(f)
52 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
53 writer.close()
54 self.assertEqual(messages, [])
55
56 def test_open_connection(self):
57 with test_utils.run_test_server() as httpd:
58 conn_fut = asyncio.open_connection(*httpd.address)
59 self._basetest_open_connection(conn_fut)
60
61 @socket_helper.skip_unless_bind_unix_socket
62 def test_open_unix_connection(self):
63 with test_utils.run_test_unix_server() as httpd:
64 conn_fut = asyncio.open_unix_connection(httpd.address)
65 self._basetest_open_connection(conn_fut)
66
67 def _basetest_open_connection_no_loop_ssl(self, open_connection_fut):
68 messages = []
69 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
70 try:
71 reader, writer = self.loop.run_until_complete(open_connection_fut)
72 finally:
73 asyncio.set_event_loop(None)
74 writer.write(b'GET / HTTP/1.0\r\n\r\n')
75 f = reader.read()
76 data = self.loop.run_until_complete(f)
77 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
78
79 writer.close()
80 self.assertEqual(messages, [])
81
82 @unittest.skipIf(ssl is None, 'No ssl module')
83 def test_open_connection_no_loop_ssl(self):
84 with test_utils.run_test_server(use_ssl=True) as httpd:
85 conn_fut = asyncio.open_connection(
86 *httpd.address,
87 ssl=test_utils.dummy_ssl_context())
88
89 self._basetest_open_connection_no_loop_ssl(conn_fut)
90
91 @socket_helper.skip_unless_bind_unix_socket
92 @unittest.skipIf(ssl is None, 'No ssl module')
93 def test_open_unix_connection_no_loop_ssl(self):
94 with test_utils.run_test_unix_server(use_ssl=True) as httpd:
95 conn_fut = asyncio.open_unix_connection(
96 httpd.address,
97 ssl=test_utils.dummy_ssl_context(),
98 server_hostname='',
99 )
100
101 self._basetest_open_connection_no_loop_ssl(conn_fut)
102
103 def _basetest_open_connection_error(self, open_connection_fut):
104 messages = []
105 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
106 reader, writer = self.loop.run_until_complete(open_connection_fut)
107 writer._protocol.connection_lost(ZeroDivisionError())
108 f = reader.read()
109 with self.assertRaises(ZeroDivisionError):
110 self.loop.run_until_complete(f)
111 writer.close()
112 test_utils.run_briefly(self.loop)
113 self.assertEqual(messages, [])
114
115 def test_open_connection_error(self):
116 with test_utils.run_test_server() as httpd:
117 conn_fut = asyncio.open_connection(*httpd.address)
118 self._basetest_open_connection_error(conn_fut)
119
120 @socket_helper.skip_unless_bind_unix_socket
121 def test_open_unix_connection_error(self):
122 with test_utils.run_test_unix_server() as httpd:
123 conn_fut = asyncio.open_unix_connection(httpd.address)
124 self._basetest_open_connection_error(conn_fut)
125
126 def test_feed_empty_data(self):
127 stream = asyncio.StreamReader(loop=self.loop)
128
129 stream.feed_data(b'')
130 self.assertEqual(b'', stream._buffer)
131
132 def test_feed_nonempty_data(self):
133 stream = asyncio.StreamReader(loop=self.loop)
134
135 stream.feed_data(self.DATA)
136 self.assertEqual(self.DATA, stream._buffer)
137
138 def test_read_zero(self):
139 # Read zero bytes.
140 stream = asyncio.StreamReader(loop=self.loop)
141 stream.feed_data(self.DATA)
142
143 data = self.loop.run_until_complete(stream.read(0))
144 self.assertEqual(b'', data)
145 self.assertEqual(self.DATA, stream._buffer)
146
147 def test_read(self):
148 # Read bytes.
149 stream = asyncio.StreamReader(loop=self.loop)
150 read_task = self.loop.create_task(stream.read(30))
151
152 def cb():
153 stream.feed_data(self.DATA)
154 self.loop.call_soon(cb)
155
156 data = self.loop.run_until_complete(read_task)
157 self.assertEqual(self.DATA, data)
158 self.assertEqual(b'', stream._buffer)
159
160 def test_read_line_breaks(self):
161 # Read bytes without line breaks.
162 stream = asyncio.StreamReader(loop=self.loop)
163 stream.feed_data(b'line1')
164 stream.feed_data(b'line2')
165
166 data = self.loop.run_until_complete(stream.read(5))
167
168 self.assertEqual(b'line1', data)
169 self.assertEqual(b'line2', stream._buffer)
170
171 def test_read_eof(self):
172 # Read bytes, stop at eof.
173 stream = asyncio.StreamReader(loop=self.loop)
174 read_task = self.loop.create_task(stream.read(1024))
175
176 def cb():
177 stream.feed_eof()
178 self.loop.call_soon(cb)
179
180 data = self.loop.run_until_complete(read_task)
181 self.assertEqual(b'', data)
182 self.assertEqual(b'', stream._buffer)
183
184 def test_read_until_eof(self):
185 # Read all bytes until eof.
186 stream = asyncio.StreamReader(loop=self.loop)
187 read_task = self.loop.create_task(stream.read(-1))
188
189 def cb():
190 stream.feed_data(b'chunk1\n')
191 stream.feed_data(b'chunk2')
192 stream.feed_eof()
193 self.loop.call_soon(cb)
194
195 data = self.loop.run_until_complete(read_task)
196
197 self.assertEqual(b'chunk1\nchunk2', data)
198 self.assertEqual(b'', stream._buffer)
199
200 def test_read_exception(self):
201 stream = asyncio.StreamReader(loop=self.loop)
202 stream.feed_data(b'line\n')
203
204 data = self.loop.run_until_complete(stream.read(2))
205 self.assertEqual(b'li', data)
206
207 stream.set_exception(ValueError())
208 self.assertRaises(
209 ValueError, self.loop.run_until_complete, stream.read(2))
210
211 def test_invalid_limit(self):
212 with self.assertRaisesRegex(ValueError, 'imit'):
213 asyncio.StreamReader(limit=0, loop=self.loop)
214
215 with self.assertRaisesRegex(ValueError, 'imit'):
216 asyncio.StreamReader(limit=-1, loop=self.loop)
217
218 def test_read_limit(self):
219 stream = asyncio.StreamReader(limit=3, loop=self.loop)
220 stream.feed_data(b'chunk')
221 data = self.loop.run_until_complete(stream.read(5))
222 self.assertEqual(b'chunk', data)
223 self.assertEqual(b'', stream._buffer)
224
225 def test_readline(self):
226 # Read one line. 'readline' will need to wait for the data
227 # to come from 'cb'
228 stream = asyncio.StreamReader(loop=self.loop)
229 stream.feed_data(b'chunk1 ')
230 read_task = self.loop.create_task(stream.readline())
231
232 def cb():
233 stream.feed_data(b'chunk2 ')
234 stream.feed_data(b'chunk3 ')
235 stream.feed_data(b'\n chunk4')
236 self.loop.call_soon(cb)
237
238 line = self.loop.run_until_complete(read_task)
239 self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
240 self.assertEqual(b' chunk4', stream._buffer)
241
242 def test_readline_limit_with_existing_data(self):
243 # Read one line. The data is in StreamReader's buffer
244 # before the event loop is run.
245
246 stream = asyncio.StreamReader(limit=3, loop=self.loop)
247 stream.feed_data(b'li')
248 stream.feed_data(b'ne1\nline2\n')
249
250 self.assertRaises(
251 ValueError, self.loop.run_until_complete, stream.readline())
252 # The buffer should contain the remaining data after exception
253 self.assertEqual(b'line2\n', stream._buffer)
254
255 stream = asyncio.StreamReader(limit=3, loop=self.loop)
256 stream.feed_data(b'li')
257 stream.feed_data(b'ne1')
258 stream.feed_data(b'li')
259
260 self.assertRaises(
261 ValueError, self.loop.run_until_complete, stream.readline())
262 # No b'\n' at the end. The 'limit' is set to 3. So before
263 # waiting for the new data in buffer, 'readline' will consume
264 # the entire buffer, and since the length of the consumed data
265 # is more than 3, it will raise a ValueError. The buffer is
266 # expected to be empty now.
267 self.assertEqual(b'', stream._buffer)
268
269 def test_at_eof(self):
270 stream = asyncio.StreamReader(loop=self.loop)
271 self.assertFalse(stream.at_eof())
272
273 stream.feed_data(b'some data\n')
274 self.assertFalse(stream.at_eof())
275
276 self.loop.run_until_complete(stream.readline())
277 self.assertFalse(stream.at_eof())
278
279 stream.feed_data(b'some data\n')
280 stream.feed_eof()
281 self.loop.run_until_complete(stream.readline())
282 self.assertTrue(stream.at_eof())
283
284 def test_readline_limit(self):
285 # Read one line. StreamReaders are fed with data after
286 # their 'readline' methods are called.
287
288 stream = asyncio.StreamReader(limit=7, loop=self.loop)
289 def cb():
290 stream.feed_data(b'chunk1')
291 stream.feed_data(b'chunk2')
292 stream.feed_data(b'chunk3\n')
293 stream.feed_eof()
294 self.loop.call_soon(cb)
295
296 self.assertRaises(
297 ValueError, self.loop.run_until_complete, stream.readline())
298 # The buffer had just one line of data, and after raising
299 # a ValueError it should be empty.
300 self.assertEqual(b'', stream._buffer)
301
302 stream = asyncio.StreamReader(limit=7, loop=self.loop)
303 def cb():
304 stream.feed_data(b'chunk1')
305 stream.feed_data(b'chunk2\n')
306 stream.feed_data(b'chunk3\n')
307 stream.feed_eof()
308 self.loop.call_soon(cb)
309
310 self.assertRaises(
311 ValueError, self.loop.run_until_complete, stream.readline())
312 self.assertEqual(b'chunk3\n', stream._buffer)
313
314 # check strictness of the limit
315 stream = asyncio.StreamReader(limit=7, loop=self.loop)
316 stream.feed_data(b'1234567\n')
317 line = self.loop.run_until_complete(stream.readline())
318 self.assertEqual(b'1234567\n', line)
319 self.assertEqual(b'', stream._buffer)
320
321 stream.feed_data(b'12345678\n')
322 with self.assertRaises(ValueError) as cm:
323 self.loop.run_until_complete(stream.readline())
324 self.assertEqual(b'', stream._buffer)
325
326 stream.feed_data(b'12345678')
327 with self.assertRaises(ValueError) as cm:
328 self.loop.run_until_complete(stream.readline())
329 self.assertEqual(b'', stream._buffer)
330
331 def test_readline_nolimit_nowait(self):
332 # All needed data for the first 'readline' call will be
333 # in the buffer.
334 stream = asyncio.StreamReader(loop=self.loop)
335 stream.feed_data(self.DATA[:6])
336 stream.feed_data(self.DATA[6:])
337
338 line = self.loop.run_until_complete(stream.readline())
339
340 self.assertEqual(b'line1\n', line)
341 self.assertEqual(b'line2\nline3\n', stream._buffer)
342
343 def test_readline_eof(self):
344 stream = asyncio.StreamReader(loop=self.loop)
345 stream.feed_data(b'some data')
346 stream.feed_eof()
347
348 line = self.loop.run_until_complete(stream.readline())
349 self.assertEqual(b'some data', line)
350
351 def test_readline_empty_eof(self):
352 stream = asyncio.StreamReader(loop=self.loop)
353 stream.feed_eof()
354
355 line = self.loop.run_until_complete(stream.readline())
356 self.assertEqual(b'', line)
357
358 def test_readline_read_byte_count(self):
359 stream = asyncio.StreamReader(loop=self.loop)
360 stream.feed_data(self.DATA)
361
362 self.loop.run_until_complete(stream.readline())
363
364 data = self.loop.run_until_complete(stream.read(7))
365
366 self.assertEqual(b'line2\nl', data)
367 self.assertEqual(b'ine3\n', stream._buffer)
368
369 def test_readline_exception(self):
370 stream = asyncio.StreamReader(loop=self.loop)
371 stream.feed_data(b'line\n')
372
373 data = self.loop.run_until_complete(stream.readline())
374 self.assertEqual(b'line\n', data)
375
376 stream.set_exception(ValueError())
377 self.assertRaises(
378 ValueError, self.loop.run_until_complete, stream.readline())
379 self.assertEqual(b'', stream._buffer)
380
381 def test_readuntil_separator(self):
382 stream = asyncio.StreamReader(loop=self.loop)
383 with self.assertRaisesRegex(ValueError, 'Separator should be'):
384 self.loop.run_until_complete(stream.readuntil(separator=b''))
385
386 def test_readuntil_multi_chunks(self):
387 stream = asyncio.StreamReader(loop=self.loop)
388
389 stream.feed_data(b'lineAAA')
390 data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))
391 self.assertEqual(b'lineAAA', data)
392 self.assertEqual(b'', stream._buffer)
393
394 stream.feed_data(b'lineAAA')
395 data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
396 self.assertEqual(b'lineAAA', data)
397 self.assertEqual(b'', stream._buffer)
398
399 stream.feed_data(b'lineAAAxxx')
400 data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
401 self.assertEqual(b'lineAAA', data)
402 self.assertEqual(b'xxx', stream._buffer)
403
404 def test_readuntil_multi_chunks_1(self):
405 stream = asyncio.StreamReader(loop=self.loop)
406
407 stream.feed_data(b'QWEaa')
408 stream.feed_data(b'XYaa')
409 stream.feed_data(b'a')
410 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
411 self.assertEqual(b'QWEaaXYaaa', data)
412 self.assertEqual(b'', stream._buffer)
413
414 stream.feed_data(b'QWEaa')
415 stream.feed_data(b'XYa')
416 stream.feed_data(b'aa')
417 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
418 self.assertEqual(b'QWEaaXYaaa', data)
419 self.assertEqual(b'', stream._buffer)
420
421 stream.feed_data(b'aaa')
422 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
423 self.assertEqual(b'aaa', data)
424 self.assertEqual(b'', stream._buffer)
425
426 stream.feed_data(b'Xaaa')
427 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
428 self.assertEqual(b'Xaaa', data)
429 self.assertEqual(b'', stream._buffer)
430
431 stream.feed_data(b'XXX')
432 stream.feed_data(b'a')
433 stream.feed_data(b'a')
434 stream.feed_data(b'a')
435 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
436 self.assertEqual(b'XXXaaa', data)
437 self.assertEqual(b'', stream._buffer)
438
439 def test_readuntil_eof(self):
440 stream = asyncio.StreamReader(loop=self.loop)
441 data = b'some dataAA'
442 stream.feed_data(data)
443 stream.feed_eof()
444
445 with self.assertRaisesRegex(asyncio.IncompleteReadError,
446 'undefined expected bytes') as cm:
447 self.loop.run_until_complete(stream.readuntil(b'AAA'))
448 self.assertEqual(cm.exception.partial, data)
449 self.assertIsNone(cm.exception.expected)
450 self.assertEqual(b'', stream._buffer)
451
452 def test_readuntil_limit_found_sep(self):
453 stream = asyncio.StreamReader(loop=self.loop, limit=3)
454 stream.feed_data(b'some dataAA')
455 with self.assertRaisesRegex(asyncio.LimitOverrunError,
456 'not found') as cm:
457 self.loop.run_until_complete(stream.readuntil(b'AAA'))
458
459 self.assertEqual(b'some dataAA', stream._buffer)
460
461 stream.feed_data(b'A')
462 with self.assertRaisesRegex(asyncio.LimitOverrunError,
463 'is found') as cm:
464 self.loop.run_until_complete(stream.readuntil(b'AAA'))
465
466 self.assertEqual(b'some dataAAA', stream._buffer)
467
468 def test_readexactly_zero_or_less(self):
469 # Read exact number of bytes (zero or less).
470 stream = asyncio.StreamReader(loop=self.loop)
471 stream.feed_data(self.DATA)
472
473 data = self.loop.run_until_complete(stream.readexactly(0))
474 self.assertEqual(b'', data)
475 self.assertEqual(self.DATA, stream._buffer)
476
477 with self.assertRaisesRegex(ValueError, 'less than zero'):
478 self.loop.run_until_complete(stream.readexactly(-1))
479 self.assertEqual(self.DATA, stream._buffer)
480
481 def test_readexactly(self):
482 # Read exact number of bytes.
483 stream = asyncio.StreamReader(loop=self.loop)
484
485 n = 2 * len(self.DATA)
486 read_task = self.loop.create_task(stream.readexactly(n))
487
488 def cb():
489 stream.feed_data(self.DATA)
490 stream.feed_data(self.DATA)
491 stream.feed_data(self.DATA)
492 self.loop.call_soon(cb)
493
494 data = self.loop.run_until_complete(read_task)
495 self.assertEqual(self.DATA + self.DATA, data)
496 self.assertEqual(self.DATA, stream._buffer)
497
498 def test_readexactly_limit(self):
499 stream = asyncio.StreamReader(limit=3, loop=self.loop)
500 stream.feed_data(b'chunk')
501 data = self.loop.run_until_complete(stream.readexactly(5))
502 self.assertEqual(b'chunk', data)
503 self.assertEqual(b'', stream._buffer)
504
505 def test_readexactly_eof(self):
506 # Read exact number of bytes (eof).
507 stream = asyncio.StreamReader(loop=self.loop)
508 n = 2 * len(self.DATA)
509 read_task = self.loop.create_task(stream.readexactly(n))
510
511 def cb():
512 stream.feed_data(self.DATA)
513 stream.feed_eof()
514 self.loop.call_soon(cb)
515
516 with self.assertRaises(asyncio.IncompleteReadError) as cm:
517 self.loop.run_until_complete(read_task)
518 self.assertEqual(cm.exception.partial, self.DATA)
519 self.assertEqual(cm.exception.expected, n)
520 self.assertEqual(str(cm.exception),
521 '18 bytes read on a total of 36 expected bytes')
522 self.assertEqual(b'', stream._buffer)
523
524 def test_readexactly_exception(self):
525 stream = asyncio.StreamReader(loop=self.loop)
526 stream.feed_data(b'line\n')
527
528 data = self.loop.run_until_complete(stream.readexactly(2))
529 self.assertEqual(b'li', data)
530
531 stream.set_exception(ValueError())
532 self.assertRaises(
533 ValueError, self.loop.run_until_complete, stream.readexactly(2))
534
535 def test_exception(self):
536 stream = asyncio.StreamReader(loop=self.loop)
537 self.assertIsNone(stream.exception())
538
539 exc = ValueError()
540 stream.set_exception(exc)
541 self.assertIs(stream.exception(), exc)
542
543 def test_exception_waiter(self):
544 stream = asyncio.StreamReader(loop=self.loop)
545
546 async def set_err():
547 stream.set_exception(ValueError())
548
549 t1 = self.loop.create_task(stream.readline())
550 t2 = self.loop.create_task(set_err())
551
552 self.loop.run_until_complete(asyncio.wait([t1, t2]))
553
554 self.assertRaises(ValueError, t1.result)
555
556 def test_exception_cancel(self):
557 stream = asyncio.StreamReader(loop=self.loop)
558
559 t = self.loop.create_task(stream.readline())
560 test_utils.run_briefly(self.loop)
561 t.cancel()
562 test_utils.run_briefly(self.loop)
563 # The following line fails if set_exception() isn't careful.
564 stream.set_exception(RuntimeError('message'))
565 test_utils.run_briefly(self.loop)
566 self.assertIs(stream._waiter, None)
567
568 def test_start_server(self):
569
570 class ESC[4;38;5;81mMyServer:
571
572 def __init__(self, loop):
573 self.server = None
574 self.loop = loop
575
576 async def handle_client(self, client_reader, client_writer):
577 data = await client_reader.readline()
578 client_writer.write(data)
579 await client_writer.drain()
580 client_writer.close()
581 await client_writer.wait_closed()
582
583 def start(self):
584 sock = socket.create_server(('127.0.0.1', 0))
585 self.server = self.loop.run_until_complete(
586 asyncio.start_server(self.handle_client,
587 sock=sock))
588 return sock.getsockname()
589
590 def handle_client_callback(self, client_reader, client_writer):
591 self.loop.create_task(self.handle_client(client_reader,
592 client_writer))
593
594 def start_callback(self):
595 sock = socket.create_server(('127.0.0.1', 0))
596 addr = sock.getsockname()
597 sock.close()
598 self.server = self.loop.run_until_complete(
599 asyncio.start_server(self.handle_client_callback,
600 host=addr[0], port=addr[1]))
601 return addr
602
603 def stop(self):
604 if self.server is not None:
605 self.server.close()
606 self.loop.run_until_complete(self.server.wait_closed())
607 self.server = None
608
609 async def client(addr):
610 reader, writer = await asyncio.open_connection(*addr)
611 # send a line
612 writer.write(b"hello world!\n")
613 # read it back
614 msgback = await reader.readline()
615 writer.close()
616 await writer.wait_closed()
617 return msgback
618
619 messages = []
620 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
621
622 # test the server variant with a coroutine as client handler
623 server = MyServer(self.loop)
624 addr = server.start()
625 msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
626 server.stop()
627 self.assertEqual(msg, b"hello world!\n")
628
629 # test the server variant with a callback as client handler
630 server = MyServer(self.loop)
631 addr = server.start_callback()
632 msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
633 server.stop()
634 self.assertEqual(msg, b"hello world!\n")
635
636 self.assertEqual(messages, [])
637
638 @socket_helper.skip_unless_bind_unix_socket
639 def test_start_unix_server(self):
640
641 class ESC[4;38;5;81mMyServer:
642
643 def __init__(self, loop, path):
644 self.server = None
645 self.loop = loop
646 self.path = path
647
648 async def handle_client(self, client_reader, client_writer):
649 data = await client_reader.readline()
650 client_writer.write(data)
651 await client_writer.drain()
652 client_writer.close()
653 await client_writer.wait_closed()
654
655 def start(self):
656 self.server = self.loop.run_until_complete(
657 asyncio.start_unix_server(self.handle_client,
658 path=self.path))
659
660 def handle_client_callback(self, client_reader, client_writer):
661 self.loop.create_task(self.handle_client(client_reader,
662 client_writer))
663
664 def start_callback(self):
665 start = asyncio.start_unix_server(self.handle_client_callback,
666 path=self.path)
667 self.server = self.loop.run_until_complete(start)
668
669 def stop(self):
670 if self.server is not None:
671 self.server.close()
672 self.loop.run_until_complete(self.server.wait_closed())
673 self.server = None
674
675 async def client(path):
676 reader, writer = await asyncio.open_unix_connection(path)
677 # send a line
678 writer.write(b"hello world!\n")
679 # read it back
680 msgback = await reader.readline()
681 writer.close()
682 await writer.wait_closed()
683 return msgback
684
685 messages = []
686 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
687
688 # test the server variant with a coroutine as client handler
689 with test_utils.unix_socket_path() as path:
690 server = MyServer(self.loop, path)
691 server.start()
692 msg = self.loop.run_until_complete(
693 self.loop.create_task(client(path)))
694 server.stop()
695 self.assertEqual(msg, b"hello world!\n")
696
697 # test the server variant with a callback as client handler
698 with test_utils.unix_socket_path() as path:
699 server = MyServer(self.loop, path)
700 server.start_callback()
701 msg = self.loop.run_until_complete(
702 self.loop.create_task(client(path)))
703 server.stop()
704 self.assertEqual(msg, b"hello world!\n")
705
706 self.assertEqual(messages, [])
707
708 @unittest.skipIf(ssl is None, 'No ssl module')
709 def test_start_tls(self):
710
711 class ESC[4;38;5;81mMyServer:
712
713 def __init__(self, loop):
714 self.server = None
715 self.loop = loop
716
717 async def handle_client(self, client_reader, client_writer):
718 data1 = await client_reader.readline()
719 client_writer.write(data1)
720 await client_writer.drain()
721 assert client_writer.get_extra_info('sslcontext') is None
722 await client_writer.start_tls(
723 test_utils.simple_server_sslcontext())
724 assert client_writer.get_extra_info('sslcontext') is not None
725 data2 = await client_reader.readline()
726 client_writer.write(data2)
727 await client_writer.drain()
728 client_writer.close()
729 await client_writer.wait_closed()
730
731 def start(self):
732 sock = socket.create_server(('127.0.0.1', 0))
733 self.server = self.loop.run_until_complete(
734 asyncio.start_server(self.handle_client,
735 sock=sock))
736 return sock.getsockname()
737
738 def stop(self):
739 if self.server is not None:
740 self.server.close()
741 self.loop.run_until_complete(self.server.wait_closed())
742 self.server = None
743
744 async def client(addr):
745 reader, writer = await asyncio.open_connection(*addr)
746 writer.write(b"hello world 1!\n")
747 await writer.drain()
748 msgback1 = await reader.readline()
749 assert writer.get_extra_info('sslcontext') is None
750 await writer.start_tls(test_utils.simple_client_sslcontext())
751 assert writer.get_extra_info('sslcontext') is not None
752 writer.write(b"hello world 2!\n")
753 await writer.drain()
754 msgback2 = await reader.readline()
755 writer.close()
756 await writer.wait_closed()
757 return msgback1, msgback2
758
759 messages = []
760 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
761
762 server = MyServer(self.loop)
763 addr = server.start()
764 msg1, msg2 = self.loop.run_until_complete(client(addr))
765 server.stop()
766
767 self.assertEqual(messages, [])
768 self.assertEqual(msg1, b"hello world 1!\n")
769 self.assertEqual(msg2, b"hello world 2!\n")
770
771 @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
772 def test_read_all_from_pipe_reader(self):
773 # See asyncio issue 168. This test is derived from the example
774 # subprocess_attach_read_pipe.py, but we configure the
775 # StreamReader's limit so that twice it is less than the size
776 # of the data writer. Also we must explicitly attach a child
777 # watcher to the event loop.
778
779 code = """\
780 import os, sys
781 fd = int(sys.argv[1])
782 os.write(fd, b'data')
783 os.close(fd)
784 """
785 rfd, wfd = os.pipe()
786 args = [sys.executable, '-c', code, str(wfd)]
787
788 pipe = open(rfd, 'rb', 0)
789 reader = asyncio.StreamReader(loop=self.loop, limit=1)
790 protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
791 transport, _ = self.loop.run_until_complete(
792 self.loop.connect_read_pipe(lambda: protocol, pipe))
793
794 watcher = asyncio.SafeChildWatcher()
795 watcher.attach_loop(self.loop)
796 try:
797 asyncio.set_child_watcher(watcher)
798 create = asyncio.create_subprocess_exec(
799 *args,
800 pass_fds={wfd},
801 )
802 proc = self.loop.run_until_complete(create)
803 self.loop.run_until_complete(proc.wait())
804 finally:
805 asyncio.set_child_watcher(None)
806
807 os.close(wfd)
808 data = self.loop.run_until_complete(reader.read(-1))
809 self.assertEqual(data, b'data')
810
811 def test_streamreader_constructor_without_loop(self):
812 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
813 asyncio.StreamReader()
814
815 def test_streamreader_constructor_use_running_loop(self):
816 # asyncio issue #184: Ensure that StreamReaderProtocol constructor
817 # retrieves the current loop if the loop parameter is not set
818 async def test():
819 return asyncio.StreamReader()
820
821 reader = self.loop.run_until_complete(test())
822 self.assertIs(reader._loop, self.loop)
823
824 def test_streamreader_constructor_use_global_loop(self):
825 # asyncio issue #184: Ensure that StreamReaderProtocol constructor
826 # retrieves the current loop if the loop parameter is not set
827 # Deprecated in 3.10, undeprecated in 3.11.1
828 self.addCleanup(asyncio.set_event_loop, None)
829 asyncio.set_event_loop(self.loop)
830 reader = asyncio.StreamReader()
831 self.assertIs(reader._loop, self.loop)
832
833
834 def test_streamreaderprotocol_constructor_without_loop(self):
835 reader = mock.Mock()
836 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
837 asyncio.StreamReaderProtocol(reader)
838
839 def test_streamreaderprotocol_constructor_use_running_loop(self):
840 # asyncio issue #184: Ensure that StreamReaderProtocol constructor
841 # retrieves the current loop if the loop parameter is not set
842 reader = mock.Mock()
843 async def test():
844 return asyncio.StreamReaderProtocol(reader)
845 protocol = self.loop.run_until_complete(test())
846 self.assertIs(protocol._loop, self.loop)
847
848 def test_streamreaderprotocol_constructor_use_global_loop(self):
849 # asyncio issue #184: Ensure that StreamReaderProtocol constructor
850 # retrieves the current loop if the loop parameter is not set
851 # Deprecated in 3.10, undeprecated in 3.11.1
852 self.addCleanup(asyncio.set_event_loop, None)
853 asyncio.set_event_loop(self.loop)
854 reader = mock.Mock()
855 protocol = asyncio.StreamReaderProtocol(reader)
856 self.assertIs(protocol._loop, self.loop)
857
858 def test_multiple_drain(self):
859 # See https://github.com/python/cpython/issues/74116
860 drained = 0
861
862 async def drainer(stream):
863 nonlocal drained
864 await stream._drain_helper()
865 drained += 1
866
867 async def main():
868 loop = asyncio.get_running_loop()
869 stream = asyncio.streams.FlowControlMixin(loop)
870 stream.pause_writing()
871 loop.call_later(0.1, stream.resume_writing)
872 await asyncio.gather(*[drainer(stream) for _ in range(10)])
873 self.assertEqual(drained, 10)
874
875 self.loop.run_until_complete(main())
876
877 def test_drain_raises(self):
878 # See http://bugs.python.org/issue25441
879
880 # This test should not use asyncio for the mock server; the
881 # whole point of the test is to test for a bug in drain()
882 # where it never gives up the event loop but the socket is
883 # closed on the server side.
884
885 messages = []
886 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
887 q = queue.Queue()
888
889 def server():
890 # Runs in a separate thread.
891 with socket.create_server(('localhost', 0)) as sock:
892 addr = sock.getsockname()
893 q.put(addr)
894 clt, _ = sock.accept()
895 clt.close()
896
897 async def client(host, port):
898 reader, writer = await asyncio.open_connection(host, port)
899
900 while True:
901 writer.write(b"foo\n")
902 await writer.drain()
903
904 # Start the server thread and wait for it to be listening.
905 thread = threading.Thread(target=server)
906 thread.daemon = True
907 thread.start()
908 addr = q.get()
909
910 # Should not be stuck in an infinite loop.
911 with self.assertRaises((ConnectionResetError, ConnectionAbortedError,
912 BrokenPipeError)):
913 self.loop.run_until_complete(client(*addr))
914
915 # Clean up the thread. (Only on success; on failure, it may
916 # be stuck in accept().)
917 thread.join()
918 self.assertEqual([], messages)
919
920 def test___repr__(self):
921 stream = asyncio.StreamReader(loop=self.loop)
922 self.assertEqual("<StreamReader>", repr(stream))
923
924 def test___repr__nondefault_limit(self):
925 stream = asyncio.StreamReader(loop=self.loop, limit=123)
926 self.assertEqual("<StreamReader limit=123>", repr(stream))
927
928 def test___repr__eof(self):
929 stream = asyncio.StreamReader(loop=self.loop)
930 stream.feed_eof()
931 self.assertEqual("<StreamReader eof>", repr(stream))
932
933 def test___repr__data(self):
934 stream = asyncio.StreamReader(loop=self.loop)
935 stream.feed_data(b'data')
936 self.assertEqual("<StreamReader 4 bytes>", repr(stream))
937
938 def test___repr__exception(self):
939 stream = asyncio.StreamReader(loop=self.loop)
940 exc = RuntimeError()
941 stream.set_exception(exc)
942 self.assertEqual("<StreamReader exception=RuntimeError()>",
943 repr(stream))
944
945 def test___repr__waiter(self):
946 stream = asyncio.StreamReader(loop=self.loop)
947 stream._waiter = asyncio.Future(loop=self.loop)
948 self.assertRegex(
949 repr(stream),
950 r"<StreamReader waiter=<Future pending[\S ]*>>")
951 stream._waiter.set_result(None)
952 self.loop.run_until_complete(stream._waiter)
953 stream._waiter = None
954 self.assertEqual("<StreamReader>", repr(stream))
955
956 def test___repr__transport(self):
957 stream = asyncio.StreamReader(loop=self.loop)
958 stream._transport = mock.Mock()
959 stream._transport.__repr__ = mock.Mock()
960 stream._transport.__repr__.return_value = "<Transport>"
961 self.assertEqual("<StreamReader transport=<Transport>>", repr(stream))
962
963 def test_IncompleteReadError_pickleable(self):
964 e = asyncio.IncompleteReadError(b'abc', 10)
965 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
966 with self.subTest(pickle_protocol=proto):
967 e2 = pickle.loads(pickle.dumps(e, protocol=proto))
968 self.assertEqual(str(e), str(e2))
969 self.assertEqual(e.partial, e2.partial)
970 self.assertEqual(e.expected, e2.expected)
971
972 def test_LimitOverrunError_pickleable(self):
973 e = asyncio.LimitOverrunError('message', 10)
974 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
975 with self.subTest(pickle_protocol=proto):
976 e2 = pickle.loads(pickle.dumps(e, protocol=proto))
977 self.assertEqual(str(e), str(e2))
978 self.assertEqual(e.consumed, e2.consumed)
979
980 def test_wait_closed_on_close(self):
981 with test_utils.run_test_server() as httpd:
982 rd, wr = self.loop.run_until_complete(
983 asyncio.open_connection(*httpd.address))
984
985 wr.write(b'GET / HTTP/1.0\r\n\r\n')
986 f = rd.readline()
987 data = self.loop.run_until_complete(f)
988 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
989 f = rd.read()
990 data = self.loop.run_until_complete(f)
991 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
992 self.assertFalse(wr.is_closing())
993 wr.close()
994 self.assertTrue(wr.is_closing())
995 self.loop.run_until_complete(wr.wait_closed())
996
997 def test_wait_closed_on_close_with_unread_data(self):
998 with test_utils.run_test_server() as httpd:
999 rd, wr = self.loop.run_until_complete(
1000 asyncio.open_connection(*httpd.address))
1001
1002 wr.write(b'GET / HTTP/1.0\r\n\r\n')
1003 f = rd.readline()
1004 data = self.loop.run_until_complete(f)
1005 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1006 wr.close()
1007 self.loop.run_until_complete(wr.wait_closed())
1008
1009 def test_async_writer_api(self):
1010 async def inner(httpd):
1011 rd, wr = await asyncio.open_connection(*httpd.address)
1012
1013 wr.write(b'GET / HTTP/1.0\r\n\r\n')
1014 data = await rd.readline()
1015 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1016 data = await rd.read()
1017 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1018 wr.close()
1019 await wr.wait_closed()
1020
1021 messages = []
1022 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1023
1024 with test_utils.run_test_server() as httpd:
1025 self.loop.run_until_complete(inner(httpd))
1026
1027 self.assertEqual(messages, [])
1028
1029 def test_async_writer_api_exception_after_close(self):
1030 async def inner(httpd):
1031 rd, wr = await asyncio.open_connection(*httpd.address)
1032
1033 wr.write(b'GET / HTTP/1.0\r\n\r\n')
1034 data = await rd.readline()
1035 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1036 data = await rd.read()
1037 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1038 wr.close()
1039 with self.assertRaises(ConnectionResetError):
1040 wr.write(b'data')
1041 await wr.drain()
1042
1043 messages = []
1044 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1045
1046 with test_utils.run_test_server() as httpd:
1047 self.loop.run_until_complete(inner(httpd))
1048
1049 self.assertEqual(messages, [])
1050
1051 def test_eof_feed_when_closing_writer(self):
1052 # See http://bugs.python.org/issue35065
1053 messages = []
1054 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1055
1056 with test_utils.run_test_server() as httpd:
1057 rd, wr = self.loop.run_until_complete(
1058 asyncio.open_connection(*httpd.address))
1059
1060 wr.close()
1061 f = wr.wait_closed()
1062 self.loop.run_until_complete(f)
1063 self.assertTrue(rd.at_eof())
1064 f = rd.read()
1065 data = self.loop.run_until_complete(f)
1066 self.assertEqual(data, b'')
1067
1068 self.assertEqual(messages, [])
1069
1070 def test_unclosed_resource_warnings(self):
1071 async def inner(httpd):
1072 rd, wr = await asyncio.open_connection(*httpd.address)
1073
1074 wr.write(b'GET / HTTP/1.0\r\n\r\n')
1075 data = await rd.readline()
1076 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1077 data = await rd.read()
1078 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1079 with self.assertWarns(ResourceWarning) as cm:
1080 del wr
1081 gc.collect()
1082 self.assertEqual(len(cm.warnings), 1)
1083 self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
1084
1085 messages = []
1086 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1087
1088 with test_utils.run_test_server() as httpd:
1089 self.loop.run_until_complete(inner(httpd))
1090
1091 self.assertEqual(messages, [])
1092
1093 def test_loop_is_closed_resource_warnings(self):
1094 async def inner(httpd):
1095 rd, wr = await asyncio.open_connection(*httpd.address)
1096
1097 wr.write(b'GET / HTTP/1.0\r\n\r\n')
1098 data = await rd.readline()
1099 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
1100 data = await rd.read()
1101 self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
1102
1103 # Make "loop is closed" occur first before "del wr" for this test.
1104 self.loop.stop()
1105 wr.close()
1106 while not self.loop.is_closed():
1107 await asyncio.sleep(0.0)
1108
1109 with self.assertWarns(ResourceWarning) as cm:
1110 del wr
1111 gc.collect()
1112 self.assertEqual(len(cm.warnings), 1)
1113 self.assertEqual("loop is closed", str(cm.warnings[0].message))
1114
1115 messages = []
1116 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1117
1118 with test_utils.run_test_server() as httpd:
1119 with self.assertRaises(RuntimeError):
1120 # This exception is caused by `self.loop.stop()` as expected.
1121 self.loop.run_until_complete(inner(httpd))
1122 gc.collect()
1123
1124 self.assertEqual(messages, [])
1125
1126 def test_unhandled_exceptions(self) -> None:
1127 port = socket_helper.find_unused_port()
1128
1129 messages = []
1130 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
1131
1132 async def client():
1133 rd, wr = await asyncio.open_connection('localhost', port)
1134 wr.write(b'test msg')
1135 await wr.drain()
1136 wr.close()
1137 await wr.wait_closed()
1138
1139 async def main():
1140 async def handle_echo(reader, writer):
1141 raise Exception('test')
1142
1143 server = await asyncio.start_server(
1144 handle_echo, 'localhost', port)
1145 await server.start_serving()
1146 await client()
1147 server.close()
1148 await server.wait_closed()
1149
1150 self.loop.run_until_complete(main())
1151
1152 self.assertEqual(messages[0]['message'],
1153 'Unhandled exception in client_connected_cb')
1154 # Break explicitly reference cycle
1155 messages = None
1156
1157
1158 if __name__ == '__main__':
1159 unittest.main()