python (3.12.0)
1 """Tests for proactor_events.py"""
2
3 import io
4 import socket
5 import unittest
6 import sys
7 from unittest import mock
8
9 import asyncio
10 from asyncio.proactor_events import BaseProactorEventLoop
11 from asyncio.proactor_events import _ProactorSocketTransport
12 from asyncio.proactor_events import _ProactorWritePipeTransport
13 from asyncio.proactor_events import _ProactorDuplexPipeTransport
14 from asyncio.proactor_events import _ProactorDatagramTransport
15 from test.support import os_helper
16 from test.support import socket_helper
17 from test.test_asyncio import utils as test_utils
18
19
20 def tearDownModule():
21 asyncio.set_event_loop_policy(None)
22
23
24 def close_transport(transport):
25 # Don't call transport.close() because the event loop and the IOCP proactor
26 # are mocked
27 if transport._sock is None:
28 return
29 transport._sock.close()
30 transport._sock = None
31
32
33 class ESC[4;38;5;81mProactorSocketTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
34
35 def setUp(self):
36 super().setUp()
37 self.loop = self.new_test_loop()
38 self.addCleanup(self.loop.close)
39 self.proactor = mock.Mock()
40 self.loop._proactor = self.proactor
41 self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
42 self.sock = mock.Mock(socket.socket)
43 self.buffer_size = 65536
44
45 def socket_transport(self, waiter=None):
46 transport = _ProactorSocketTransport(self.loop, self.sock,
47 self.protocol, waiter=waiter)
48 self.addCleanup(close_transport, transport)
49 return transport
50
51 def test_ctor(self):
52 fut = self.loop.create_future()
53 tr = self.socket_transport(waiter=fut)
54 test_utils.run_briefly(self.loop)
55 self.assertIsNone(fut.result())
56 self.protocol.connection_made(tr)
57 self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
58
59 def test_loop_reading(self):
60 tr = self.socket_transport()
61 tr._loop_reading()
62 self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
63 self.assertFalse(self.protocol.data_received.called)
64 self.assertFalse(self.protocol.eof_received.called)
65
66 def test_loop_reading_data(self):
67 buf = b'data'
68 res = self.loop.create_future()
69 res.set_result(len(buf))
70
71 tr = self.socket_transport()
72 tr._read_fut = res
73 tr._data[:len(buf)] = buf
74 tr._loop_reading(res)
75 called_buf = bytearray(self.buffer_size)
76 called_buf[:len(buf)] = buf
77 self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
78 self.protocol.data_received.assert_called_with(buf)
79 # assert_called_with maps bytearray and bytes to the same thing so check manually
80 # regression test for https://github.com/python/cpython/issues/99941
81 self.assertIsInstance(self.protocol.data_received.call_args.args[0], bytes)
82
83 @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
84 def test_loop_reading_no_data(self):
85 res = self.loop.create_future()
86 res.set_result(0)
87
88 tr = self.socket_transport()
89 self.assertRaises(AssertionError, tr._loop_reading, res)
90
91 tr.close = mock.Mock()
92 tr._read_fut = res
93 tr._loop_reading(res)
94 self.assertFalse(self.loop._proactor.recv_into.called)
95 self.assertTrue(self.protocol.eof_received.called)
96 self.assertTrue(tr.close.called)
97
98 def test_loop_reading_aborted(self):
99 err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
100
101 tr = self.socket_transport()
102 tr._fatal_error = mock.Mock()
103 tr._loop_reading()
104 tr._fatal_error.assert_called_with(
105 err,
106 'Fatal read error on pipe transport')
107
108 def test_loop_reading_aborted_closing(self):
109 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
110
111 tr = self.socket_transport()
112 tr._closing = True
113 tr._fatal_error = mock.Mock()
114 tr._loop_reading()
115 self.assertFalse(tr._fatal_error.called)
116
117 def test_loop_reading_aborted_is_fatal(self):
118 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
119 tr = self.socket_transport()
120 tr._closing = False
121 tr._fatal_error = mock.Mock()
122 tr._loop_reading()
123 self.assertTrue(tr._fatal_error.called)
124
125 def test_loop_reading_conn_reset_lost(self):
126 err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
127
128 tr = self.socket_transport()
129 tr._closing = False
130 tr._fatal_error = mock.Mock()
131 tr._force_close = mock.Mock()
132 tr._loop_reading()
133 self.assertFalse(tr._fatal_error.called)
134 tr._force_close.assert_called_with(err)
135
136 def test_loop_reading_exception(self):
137 err = self.loop._proactor.recv_into.side_effect = (OSError())
138
139 tr = self.socket_transport()
140 tr._fatal_error = mock.Mock()
141 tr._loop_reading()
142 tr._fatal_error.assert_called_with(
143 err,
144 'Fatal read error on pipe transport')
145
146 def test_write(self):
147 tr = self.socket_transport()
148 tr._loop_writing = mock.Mock()
149 tr.write(b'data')
150 self.assertEqual(tr._buffer, None)
151 tr._loop_writing.assert_called_with(data=b'data')
152
153 def test_write_no_data(self):
154 tr = self.socket_transport()
155 tr.write(b'')
156 self.assertFalse(tr._buffer)
157
158 def test_write_more(self):
159 tr = self.socket_transport()
160 tr._write_fut = mock.Mock()
161 tr._loop_writing = mock.Mock()
162 tr.write(b'data')
163 self.assertEqual(tr._buffer, b'data')
164 self.assertFalse(tr._loop_writing.called)
165
166 def test_loop_writing(self):
167 tr = self.socket_transport()
168 tr._buffer = bytearray(b'data')
169 tr._loop_writing()
170 self.loop._proactor.send.assert_called_with(self.sock, b'data')
171 self.loop._proactor.send.return_value.add_done_callback.\
172 assert_called_with(tr._loop_writing)
173
174 @mock.patch('asyncio.proactor_events.logger')
175 def test_loop_writing_err(self, m_log):
176 err = self.loop._proactor.send.side_effect = OSError()
177 tr = self.socket_transport()
178 tr._fatal_error = mock.Mock()
179 tr._buffer = [b'da', b'ta']
180 tr._loop_writing()
181 tr._fatal_error.assert_called_with(
182 err,
183 'Fatal write error on pipe transport')
184 tr._conn_lost = 1
185
186 tr.write(b'data')
187 tr.write(b'data')
188 tr.write(b'data')
189 tr.write(b'data')
190 tr.write(b'data')
191 self.assertEqual(tr._buffer, None)
192 m_log.warning.assert_called_with('socket.send() raised exception.')
193
194 def test_loop_writing_stop(self):
195 fut = self.loop.create_future()
196 fut.set_result(b'data')
197
198 tr = self.socket_transport()
199 tr._write_fut = fut
200 tr._loop_writing(fut)
201 self.assertIsNone(tr._write_fut)
202
203 def test_loop_writing_closing(self):
204 fut = self.loop.create_future()
205 fut.set_result(1)
206
207 tr = self.socket_transport()
208 tr._write_fut = fut
209 tr.close()
210 tr._loop_writing(fut)
211 self.assertIsNone(tr._write_fut)
212 test_utils.run_briefly(self.loop)
213 self.protocol.connection_lost.assert_called_with(None)
214
215 def test_abort(self):
216 tr = self.socket_transport()
217 tr._force_close = mock.Mock()
218 tr.abort()
219 tr._force_close.assert_called_with(None)
220
221 def test_close(self):
222 tr = self.socket_transport()
223 tr.close()
224 test_utils.run_briefly(self.loop)
225 self.protocol.connection_lost.assert_called_with(None)
226 self.assertTrue(tr.is_closing())
227 self.assertEqual(tr._conn_lost, 1)
228
229 self.protocol.connection_lost.reset_mock()
230 tr.close()
231 test_utils.run_briefly(self.loop)
232 self.assertFalse(self.protocol.connection_lost.called)
233
234 def test_close_write_fut(self):
235 tr = self.socket_transport()
236 tr._write_fut = mock.Mock()
237 tr.close()
238 test_utils.run_briefly(self.loop)
239 self.assertFalse(self.protocol.connection_lost.called)
240
241 def test_close_buffer(self):
242 tr = self.socket_transport()
243 tr._buffer = [b'data']
244 tr.close()
245 test_utils.run_briefly(self.loop)
246 self.assertFalse(self.protocol.connection_lost.called)
247
248 def test_close_invalid_sockobj(self):
249 tr = self.socket_transport()
250 self.sock.fileno.return_value = -1
251 tr.close()
252 test_utils.run_briefly(self.loop)
253 self.protocol.connection_lost.assert_called_with(None)
254 self.assertFalse(self.sock.shutdown.called)
255
256 @mock.patch('asyncio.base_events.logger')
257 def test_fatal_error(self, m_logging):
258 tr = self.socket_transport()
259 tr._force_close = mock.Mock()
260 tr._fatal_error(None)
261 self.assertTrue(tr._force_close.called)
262 self.assertTrue(m_logging.error.called)
263
264 def test_force_close(self):
265 tr = self.socket_transport()
266 tr._buffer = [b'data']
267 read_fut = tr._read_fut = mock.Mock()
268 write_fut = tr._write_fut = mock.Mock()
269 tr._force_close(None)
270
271 read_fut.cancel.assert_called_with()
272 write_fut.cancel.assert_called_with()
273 test_utils.run_briefly(self.loop)
274 self.protocol.connection_lost.assert_called_with(None)
275 self.assertEqual(None, tr._buffer)
276 self.assertEqual(tr._conn_lost, 1)
277
278 def test_loop_writing_force_close(self):
279 exc_handler = mock.Mock()
280 self.loop.set_exception_handler(exc_handler)
281 fut = self.loop.create_future()
282 fut.set_result(1)
283 self.proactor.send.return_value = fut
284
285 tr = self.socket_transport()
286 tr.write(b'data')
287 tr._force_close(None)
288 test_utils.run_briefly(self.loop)
289 exc_handler.assert_not_called()
290
291 def test_force_close_idempotent(self):
292 tr = self.socket_transport()
293 tr._closing = True
294 tr._force_close(None)
295 test_utils.run_briefly(self.loop)
296 # See https://github.com/python/cpython/issues/89237
297 # `protocol.connection_lost` should be called even if
298 # the transport was closed forcefully otherwise
299 # the resources held by protocol will never be freed
300 # and waiters will never be notified leading to hang.
301 self.assertTrue(self.protocol.connection_lost.called)
302
303 def test_force_close_protocol_connection_lost_once(self):
304 tr = self.socket_transport()
305 self.assertFalse(self.protocol.connection_lost.called)
306 tr._closing = True
307 # Calling _force_close twice should not call
308 # protocol.connection_lost twice
309 tr._force_close(None)
310 tr._force_close(None)
311 test_utils.run_briefly(self.loop)
312 self.assertEqual(1, self.protocol.connection_lost.call_count)
313
314 def test_close_protocol_connection_lost_once(self):
315 tr = self.socket_transport()
316 self.assertFalse(self.protocol.connection_lost.called)
317 # Calling close twice should not call
318 # protocol.connection_lost twice
319 tr.close()
320 tr.close()
321 test_utils.run_briefly(self.loop)
322 self.assertEqual(1, self.protocol.connection_lost.call_count)
323
324 def test_fatal_error_2(self):
325 tr = self.socket_transport()
326 tr._buffer = [b'data']
327 tr._force_close(None)
328
329 test_utils.run_briefly(self.loop)
330 self.protocol.connection_lost.assert_called_with(None)
331 self.assertEqual(None, tr._buffer)
332
333 def test_call_connection_lost(self):
334 tr = self.socket_transport()
335 tr._call_connection_lost(None)
336 self.assertTrue(self.protocol.connection_lost.called)
337 self.assertTrue(self.sock.close.called)
338
339 def test_write_eof(self):
340 tr = self.socket_transport()
341 self.assertTrue(tr.can_write_eof())
342 tr.write_eof()
343 self.sock.shutdown.assert_called_with(socket.SHUT_WR)
344 tr.write_eof()
345 self.assertEqual(self.sock.shutdown.call_count, 1)
346 tr.close()
347
348 def test_write_eof_buffer(self):
349 tr = self.socket_transport()
350 f = self.loop.create_future()
351 tr._loop._proactor.send.return_value = f
352 tr.write(b'data')
353 tr.write_eof()
354 self.assertTrue(tr._eof_written)
355 self.assertFalse(self.sock.shutdown.called)
356 tr._loop._proactor.send.assert_called_with(self.sock, b'data')
357 f.set_result(4)
358 self.loop._run_once()
359 self.sock.shutdown.assert_called_with(socket.SHUT_WR)
360 tr.close()
361
362 def test_write_eof_write_pipe(self):
363 tr = _ProactorWritePipeTransport(
364 self.loop, self.sock, self.protocol)
365 self.assertTrue(tr.can_write_eof())
366 tr.write_eof()
367 self.assertTrue(tr.is_closing())
368 self.loop._run_once()
369 self.assertTrue(self.sock.close.called)
370 tr.close()
371
372 def test_write_eof_buffer_write_pipe(self):
373 tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
374 f = self.loop.create_future()
375 tr._loop._proactor.send.return_value = f
376 tr.write(b'data')
377 tr.write_eof()
378 self.assertTrue(tr.is_closing())
379 self.assertFalse(self.sock.shutdown.called)
380 tr._loop._proactor.send.assert_called_with(self.sock, b'data')
381 f.set_result(4)
382 self.loop._run_once()
383 self.loop._run_once()
384 self.assertTrue(self.sock.close.called)
385 tr.close()
386
387 def test_write_eof_duplex_pipe(self):
388 tr = _ProactorDuplexPipeTransport(
389 self.loop, self.sock, self.protocol)
390 self.assertFalse(tr.can_write_eof())
391 with self.assertRaises(NotImplementedError):
392 tr.write_eof()
393 close_transport(tr)
394
395 def test_pause_resume_reading(self):
396 tr = self.socket_transport()
397 index = 0
398 msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
399 reversed_msgs = list(reversed(msgs))
400
401 def recv_into(sock, data):
402 f = self.loop.create_future()
403 msg = reversed_msgs.pop()
404
405 result = f.result
406 def monkey():
407 data[:len(msg)] = msg
408 return result()
409 f.result = monkey
410
411 f.set_result(len(msg))
412 return f
413
414 self.loop._proactor.recv_into.side_effect = recv_into
415 self.loop._run_once()
416 self.assertFalse(tr._paused)
417 self.assertTrue(tr.is_reading())
418
419 for msg in msgs[:2]:
420 self.loop._run_once()
421 self.protocol.data_received.assert_called_with(bytearray(msg))
422
423 tr.pause_reading()
424 tr.pause_reading()
425 self.assertTrue(tr._paused)
426 self.assertFalse(tr.is_reading())
427 for i in range(10):
428 self.loop._run_once()
429 self.protocol.data_received.assert_called_with(bytearray(msgs[1]))
430
431 tr.resume_reading()
432 tr.resume_reading()
433 self.assertFalse(tr._paused)
434 self.assertTrue(tr.is_reading())
435
436 for msg in msgs[2:4]:
437 self.loop._run_once()
438 self.protocol.data_received.assert_called_with(bytearray(msg))
439
440 tr.pause_reading()
441 tr.resume_reading()
442 self.loop.call_exception_handler = mock.Mock()
443 self.loop._run_once()
444 self.loop.call_exception_handler.assert_not_called()
445 self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
446 tr.close()
447
448 self.assertFalse(tr.is_reading())
449
450 def test_pause_reading_connection_made(self):
451 tr = self.socket_transport()
452 self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
453 test_utils.run_briefly(self.loop)
454 self.assertFalse(tr.is_reading())
455 self.loop.assert_no_reader(7)
456
457 tr.resume_reading()
458 self.assertTrue(tr.is_reading())
459
460 tr.close()
461 self.assertFalse(tr.is_reading())
462
463
464 def pause_writing_transport(self, high):
465 tr = self.socket_transport()
466 tr.set_write_buffer_limits(high=high)
467
468 self.assertEqual(tr.get_write_buffer_size(), 0)
469 self.assertFalse(self.protocol.pause_writing.called)
470 self.assertFalse(self.protocol.resume_writing.called)
471 return tr
472
473 def test_pause_resume_writing(self):
474 tr = self.pause_writing_transport(high=4)
475
476 # write a large chunk, must pause writing
477 fut = self.loop.create_future()
478 self.loop._proactor.send.return_value = fut
479 tr.write(b'large data')
480 self.loop._run_once()
481 self.assertTrue(self.protocol.pause_writing.called)
482
483 # flush the buffer
484 fut.set_result(None)
485 self.loop._run_once()
486 self.assertEqual(tr.get_write_buffer_size(), 0)
487 self.assertTrue(self.protocol.resume_writing.called)
488
489 def test_pause_writing_2write(self):
490 tr = self.pause_writing_transport(high=4)
491
492 # first short write, the buffer is not full (3 <= 4)
493 fut1 = self.loop.create_future()
494 self.loop._proactor.send.return_value = fut1
495 tr.write(b'123')
496 self.loop._run_once()
497 self.assertEqual(tr.get_write_buffer_size(), 3)
498 self.assertFalse(self.protocol.pause_writing.called)
499
500 # fill the buffer, must pause writing (6 > 4)
501 tr.write(b'abc')
502 self.loop._run_once()
503 self.assertEqual(tr.get_write_buffer_size(), 6)
504 self.assertTrue(self.protocol.pause_writing.called)
505
506 def test_pause_writing_3write(self):
507 tr = self.pause_writing_transport(high=4)
508
509 # first short write, the buffer is not full (1 <= 4)
510 fut = self.loop.create_future()
511 self.loop._proactor.send.return_value = fut
512 tr.write(b'1')
513 self.loop._run_once()
514 self.assertEqual(tr.get_write_buffer_size(), 1)
515 self.assertFalse(self.protocol.pause_writing.called)
516
517 # second short write, the buffer is not full (3 <= 4)
518 tr.write(b'23')
519 self.loop._run_once()
520 self.assertEqual(tr.get_write_buffer_size(), 3)
521 self.assertFalse(self.protocol.pause_writing.called)
522
523 # fill the buffer, must pause writing (6 > 4)
524 tr.write(b'abc')
525 self.loop._run_once()
526 self.assertEqual(tr.get_write_buffer_size(), 6)
527 self.assertTrue(self.protocol.pause_writing.called)
528
529 def test_dont_pause_writing(self):
530 tr = self.pause_writing_transport(high=4)
531
532 # write a large chunk which completes immediately,
533 # it should not pause writing
534 fut = self.loop.create_future()
535 fut.set_result(None)
536 self.loop._proactor.send.return_value = fut
537 tr.write(b'very large data')
538 self.loop._run_once()
539 self.assertEqual(tr.get_write_buffer_size(), 0)
540 self.assertFalse(self.protocol.pause_writing.called)
541
542
543 class ESC[4;38;5;81mProactorDatagramTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
544
545 def setUp(self):
546 super().setUp()
547 self.loop = self.new_test_loop()
548 self.proactor = mock.Mock()
549 self.loop._proactor = self.proactor
550 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
551 self.sock = mock.Mock(spec_set=socket.socket)
552 self.sock.fileno.return_value = 7
553
554 def datagram_transport(self, address=None):
555 self.sock.getpeername.side_effect = None if address else OSError
556 transport = _ProactorDatagramTransport(self.loop, self.sock,
557 self.protocol,
558 address=address)
559 self.addCleanup(close_transport, transport)
560 return transport
561
562 def test_sendto(self):
563 data = b'data'
564 transport = self.datagram_transport()
565 transport.sendto(data, ('0.0.0.0', 1234))
566 self.assertTrue(self.proactor.sendto.called)
567 self.proactor.sendto.assert_called_with(
568 self.sock, data, addr=('0.0.0.0', 1234))
569
570 def test_sendto_bytearray(self):
571 data = bytearray(b'data')
572 transport = self.datagram_transport()
573 transport.sendto(data, ('0.0.0.0', 1234))
574 self.assertTrue(self.proactor.sendto.called)
575 self.proactor.sendto.assert_called_with(
576 self.sock, b'data', addr=('0.0.0.0', 1234))
577
578 def test_sendto_memoryview(self):
579 data = memoryview(b'data')
580 transport = self.datagram_transport()
581 transport.sendto(data, ('0.0.0.0', 1234))
582 self.assertTrue(self.proactor.sendto.called)
583 self.proactor.sendto.assert_called_with(
584 self.sock, b'data', addr=('0.0.0.0', 1234))
585
586 def test_sendto_no_data(self):
587 transport = self.datagram_transport()
588 transport._buffer.append((b'data', ('0.0.0.0', 12345)))
589 transport.sendto(b'', ())
590 self.assertFalse(self.sock.sendto.called)
591 self.assertEqual(
592 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
593
594 def test_sendto_buffer(self):
595 transport = self.datagram_transport()
596 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
597 transport._write_fut = object()
598 transport.sendto(b'data2', ('0.0.0.0', 12345))
599 self.assertFalse(self.proactor.sendto.called)
600 self.assertEqual(
601 [(b'data1', ('0.0.0.0', 12345)),
602 (b'data2', ('0.0.0.0', 12345))],
603 list(transport._buffer))
604
605 def test_sendto_buffer_bytearray(self):
606 data2 = bytearray(b'data2')
607 transport = self.datagram_transport()
608 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
609 transport._write_fut = object()
610 transport.sendto(data2, ('0.0.0.0', 12345))
611 self.assertFalse(self.proactor.sendto.called)
612 self.assertEqual(
613 [(b'data1', ('0.0.0.0', 12345)),
614 (b'data2', ('0.0.0.0', 12345))],
615 list(transport._buffer))
616 self.assertIsInstance(transport._buffer[1][0], bytes)
617
618 def test_sendto_buffer_memoryview(self):
619 data2 = memoryview(b'data2')
620 transport = self.datagram_transport()
621 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
622 transport._write_fut = object()
623 transport.sendto(data2, ('0.0.0.0', 12345))
624 self.assertFalse(self.proactor.sendto.called)
625 self.assertEqual(
626 [(b'data1', ('0.0.0.0', 12345)),
627 (b'data2', ('0.0.0.0', 12345))],
628 list(transport._buffer))
629 self.assertIsInstance(transport._buffer[1][0], bytes)
630
631 @mock.patch('asyncio.proactor_events.logger')
632 def test_sendto_exception(self, m_log):
633 data = b'data'
634 err = self.proactor.sendto.side_effect = RuntimeError()
635
636 transport = self.datagram_transport()
637 transport._fatal_error = mock.Mock()
638 transport.sendto(data, ())
639
640 self.assertTrue(transport._fatal_error.called)
641 transport._fatal_error.assert_called_with(
642 err,
643 'Fatal write error on datagram transport')
644 transport._conn_lost = 1
645
646 transport._address = ('123',)
647 transport.sendto(data)
648 transport.sendto(data)
649 transport.sendto(data)
650 transport.sendto(data)
651 transport.sendto(data)
652 m_log.warning.assert_called_with('socket.sendto() raised exception.')
653
654 def test_sendto_error_received(self):
655 data = b'data'
656
657 self.sock.sendto.side_effect = ConnectionRefusedError
658
659 transport = self.datagram_transport()
660 transport._fatal_error = mock.Mock()
661 transport.sendto(data, ())
662
663 self.assertEqual(transport._conn_lost, 0)
664 self.assertFalse(transport._fatal_error.called)
665
666 def test_sendto_error_received_connected(self):
667 data = b'data'
668
669 self.proactor.send.side_effect = ConnectionRefusedError
670
671 transport = self.datagram_transport(address=('0.0.0.0', 1))
672 transport._fatal_error = mock.Mock()
673 transport.sendto(data)
674
675 self.assertFalse(transport._fatal_error.called)
676 self.assertTrue(self.protocol.error_received.called)
677
678 def test_sendto_str(self):
679 transport = self.datagram_transport()
680 self.assertRaises(TypeError, transport.sendto, 'str', ())
681
682 def test_sendto_connected_addr(self):
683 transport = self.datagram_transport(address=('0.0.0.0', 1))
684 self.assertRaises(
685 ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
686
687 def test_sendto_closing(self):
688 transport = self.datagram_transport(address=(1,))
689 transport.close()
690 self.assertEqual(transport._conn_lost, 1)
691 transport.sendto(b'data', (1,))
692 self.assertEqual(transport._conn_lost, 2)
693
694 def test__loop_writing_closing(self):
695 transport = self.datagram_transport()
696 transport._closing = True
697 transport._loop_writing()
698 self.assertIsNone(transport._write_fut)
699 test_utils.run_briefly(self.loop)
700 self.sock.close.assert_called_with()
701 self.protocol.connection_lost.assert_called_with(None)
702
703 def test__loop_writing_exception(self):
704 err = self.proactor.sendto.side_effect = RuntimeError()
705
706 transport = self.datagram_transport()
707 transport._fatal_error = mock.Mock()
708 transport._buffer.append((b'data', ()))
709 transport._loop_writing()
710
711 transport._fatal_error.assert_called_with(
712 err,
713 'Fatal write error on datagram transport')
714
715 def test__loop_writing_error_received(self):
716 self.proactor.sendto.side_effect = ConnectionRefusedError
717
718 transport = self.datagram_transport()
719 transport._fatal_error = mock.Mock()
720 transport._buffer.append((b'data', ()))
721 transport._loop_writing()
722
723 self.assertFalse(transport._fatal_error.called)
724
725 def test__loop_writing_error_received_connection(self):
726 self.proactor.send.side_effect = ConnectionRefusedError
727
728 transport = self.datagram_transport(address=('0.0.0.0', 1))
729 transport._fatal_error = mock.Mock()
730 transport._buffer.append((b'data', ()))
731 transport._loop_writing()
732
733 self.assertFalse(transport._fatal_error.called)
734 self.assertTrue(self.protocol.error_received.called)
735
736 @mock.patch('asyncio.base_events.logger.error')
737 def test_fatal_error_connected(self, m_exc):
738 transport = self.datagram_transport(address=('0.0.0.0', 1))
739 err = ConnectionRefusedError()
740 transport._fatal_error(err)
741 self.assertFalse(self.protocol.error_received.called)
742 m_exc.assert_not_called()
743
744
745 class ESC[4;38;5;81mBaseProactorEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
746
747 def setUp(self):
748 super().setUp()
749
750 self.sock = test_utils.mock_nonblocking_socket()
751 self.proactor = mock.Mock()
752
753 self.ssock, self.csock = mock.Mock(), mock.Mock()
754
755 with mock.patch('asyncio.proactor_events.socket.socketpair',
756 return_value=(self.ssock, self.csock)):
757 with mock.patch('signal.set_wakeup_fd'):
758 self.loop = BaseProactorEventLoop(self.proactor)
759 self.set_event_loop(self.loop)
760
761 @mock.patch('asyncio.proactor_events.socket.socketpair')
762 def test_ctor(self, socketpair):
763 ssock, csock = socketpair.return_value = (
764 mock.Mock(), mock.Mock())
765 with mock.patch('signal.set_wakeup_fd'):
766 loop = BaseProactorEventLoop(self.proactor)
767 self.assertIs(loop._ssock, ssock)
768 self.assertIs(loop._csock, csock)
769 self.assertEqual(loop._internal_fds, 1)
770 loop.close()
771
772 def test_close_self_pipe(self):
773 self.loop._close_self_pipe()
774 self.assertEqual(self.loop._internal_fds, 0)
775 self.assertTrue(self.ssock.close.called)
776 self.assertTrue(self.csock.close.called)
777 self.assertIsNone(self.loop._ssock)
778 self.assertIsNone(self.loop._csock)
779
780 # Don't call close(): _close_self_pipe() cannot be called twice
781 self.loop._closed = True
782
783 def test_close(self):
784 self.loop._close_self_pipe = mock.Mock()
785 self.loop.close()
786 self.assertTrue(self.loop._close_self_pipe.called)
787 self.assertTrue(self.proactor.close.called)
788 self.assertIsNone(self.loop._proactor)
789
790 self.loop._close_self_pipe.reset_mock()
791 self.loop.close()
792 self.assertFalse(self.loop._close_self_pipe.called)
793
794 def test_make_socket_transport(self):
795 tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
796 self.assertIsInstance(tr, _ProactorSocketTransport)
797 close_transport(tr)
798
799 def test_loop_self_reading(self):
800 self.loop._loop_self_reading()
801 self.proactor.recv.assert_called_with(self.ssock, 4096)
802 self.proactor.recv.return_value.add_done_callback.assert_called_with(
803 self.loop._loop_self_reading)
804
805 def test_loop_self_reading_fut(self):
806 fut = mock.Mock()
807 self.loop._self_reading_future = fut
808 self.loop._loop_self_reading(fut)
809 self.assertTrue(fut.result.called)
810 self.proactor.recv.assert_called_with(self.ssock, 4096)
811 self.proactor.recv.return_value.add_done_callback.assert_called_with(
812 self.loop._loop_self_reading)
813
814 def test_loop_self_reading_exception(self):
815 self.loop.call_exception_handler = mock.Mock()
816 self.proactor.recv.side_effect = OSError()
817 self.loop._loop_self_reading()
818 self.assertTrue(self.loop.call_exception_handler.called)
819
820 def test_write_to_self(self):
821 self.loop._write_to_self()
822 self.csock.send.assert_called_with(b'\0')
823
824 def test_process_events(self):
825 self.loop._process_events([])
826
827 @mock.patch('asyncio.base_events.logger')
828 def test_create_server(self, m_log):
829 pf = mock.Mock()
830 call_soon = self.loop.call_soon = mock.Mock()
831
832 self.loop._start_serving(pf, self.sock)
833 self.assertTrue(call_soon.called)
834
835 # callback
836 loop = call_soon.call_args[0][0]
837 loop()
838 self.proactor.accept.assert_called_with(self.sock)
839
840 # conn
841 fut = mock.Mock()
842 fut.result.return_value = (mock.Mock(), mock.Mock())
843
844 make_tr = self.loop._make_socket_transport = mock.Mock()
845 loop(fut)
846 self.assertTrue(fut.result.called)
847 self.assertTrue(make_tr.called)
848
849 # exception
850 fut.result.side_effect = OSError()
851 loop(fut)
852 self.assertTrue(self.sock.close.called)
853 self.assertTrue(m_log.error.called)
854
855 def test_create_server_cancel(self):
856 pf = mock.Mock()
857 call_soon = self.loop.call_soon = mock.Mock()
858
859 self.loop._start_serving(pf, self.sock)
860 loop = call_soon.call_args[0][0]
861
862 # cancelled
863 fut = self.loop.create_future()
864 fut.cancel()
865 loop(fut)
866 self.assertTrue(self.sock.close.called)
867
868 def test_stop_serving(self):
869 sock1 = mock.Mock()
870 future1 = mock.Mock()
871 sock2 = mock.Mock()
872 future2 = mock.Mock()
873 self.loop._accept_futures = {
874 sock1.fileno(): future1,
875 sock2.fileno(): future2
876 }
877
878 self.loop._stop_serving(sock1)
879 self.assertTrue(sock1.close.called)
880 self.assertTrue(future1.cancel.called)
881 self.proactor._stop_serving.assert_called_with(sock1)
882 self.assertFalse(sock2.close.called)
883 self.assertFalse(future2.cancel.called)
884
885 def datagram_transport(self):
886 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
887 return self.loop._make_datagram_transport(self.sock, self.protocol)
888
889 def test_make_datagram_transport(self):
890 tr = self.datagram_transport()
891 self.assertIsInstance(tr, _ProactorDatagramTransport)
892 self.assertIsInstance(tr, asyncio.DatagramTransport)
893 close_transport(tr)
894
895 def test_datagram_loop_writing(self):
896 tr = self.datagram_transport()
897 tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
898 tr._loop_writing()
899 self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
900 self.loop._proactor.sendto.return_value.add_done_callback.\
901 assert_called_with(tr._loop_writing)
902
903 close_transport(tr)
904
905 def test_datagram_loop_reading(self):
906 tr = self.datagram_transport()
907 tr._loop_reading()
908 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
909 self.assertFalse(self.protocol.datagram_received.called)
910 self.assertFalse(self.protocol.error_received.called)
911 close_transport(tr)
912
913 def test_datagram_loop_reading_data(self):
914 res = self.loop.create_future()
915 res.set_result((b'data', ('127.0.0.1', 12068)))
916
917 tr = self.datagram_transport()
918 tr._read_fut = res
919 tr._loop_reading(res)
920 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
921 self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
922 close_transport(tr)
923
924 @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
925 def test_datagram_loop_reading_no_data(self):
926 res = self.loop.create_future()
927 res.set_result((b'', ('127.0.0.1', 12068)))
928
929 tr = self.datagram_transport()
930 self.assertRaises(AssertionError, tr._loop_reading, res)
931
932 tr.close = mock.Mock()
933 tr._read_fut = res
934 tr._loop_reading(res)
935 self.assertTrue(self.loop._proactor.recvfrom.called)
936 self.assertFalse(self.protocol.error_received.called)
937 self.assertFalse(tr.close.called)
938 close_transport(tr)
939
940 def test_datagram_loop_reading_aborted(self):
941 err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
942
943 tr = self.datagram_transport()
944 tr._fatal_error = mock.Mock()
945 tr._protocol.error_received = mock.Mock()
946 tr._loop_reading()
947 tr._protocol.error_received.assert_called_with(err)
948 close_transport(tr)
949
950 def test_datagram_loop_writing_aborted(self):
951 err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
952
953 tr = self.datagram_transport()
954 tr._fatal_error = mock.Mock()
955 tr._protocol.error_received = mock.Mock()
956 tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
957 tr._loop_writing()
958 tr._protocol.error_received.assert_called_with(err)
959 close_transport(tr)
960
961
962 @unittest.skipIf(sys.platform != 'win32',
963 'Proactor is supported on Windows only')
964 class ESC[4;38;5;81mProactorEventLoopUnixSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
965 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
966
967 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
968
969 def __init__(self, loop):
970 self.started = False
971 self.closed = False
972 self.data = bytearray()
973 self.fut = loop.create_future()
974 self.transport = None
975
976 def connection_made(self, transport):
977 self.started = True
978 self.transport = transport
979
980 def data_received(self, data):
981 self.data.extend(data)
982
983 def connection_lost(self, exc):
984 self.closed = True
985 self.fut.set_result(None)
986
987 async def wait_closed(self):
988 await self.fut
989
990 @classmethod
991 def setUpClass(cls):
992 with open(os_helper.TESTFN, 'wb') as fp:
993 fp.write(cls.DATA)
994 super().setUpClass()
995
996 @classmethod
997 def tearDownClass(cls):
998 os_helper.unlink(os_helper.TESTFN)
999 super().tearDownClass()
1000
1001 def setUp(self):
1002 self.loop = asyncio.ProactorEventLoop()
1003 self.set_event_loop(self.loop)
1004 self.addCleanup(self.loop.close)
1005 self.file = open(os_helper.TESTFN, 'rb')
1006 self.addCleanup(self.file.close)
1007 super().setUp()
1008
1009 def make_socket(self, cleanup=True):
1010 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1011 sock.setblocking(False)
1012 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
1013 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
1014 if cleanup:
1015 self.addCleanup(sock.close)
1016 return sock
1017
1018 def run_loop(self, coro):
1019 return self.loop.run_until_complete(coro)
1020
1021 def prepare(self):
1022 sock = self.make_socket()
1023 proto = self.MyProto(self.loop)
1024 port = socket_helper.find_unused_port()
1025 srv_sock = self.make_socket(cleanup=False)
1026 srv_sock.bind(('127.0.0.1', port))
1027 server = self.run_loop(self.loop.create_server(
1028 lambda: proto, sock=srv_sock))
1029 self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
1030
1031 def cleanup():
1032 if proto.transport is not None:
1033 # can be None if the task was cancelled before
1034 # connection_made callback
1035 proto.transport.close()
1036 self.run_loop(proto.wait_closed())
1037
1038 server.close()
1039 self.run_loop(server.wait_closed())
1040
1041 self.addCleanup(cleanup)
1042
1043 return sock, proto
1044
1045 def test_sock_sendfile_not_a_file(self):
1046 sock, proto = self.prepare()
1047 f = object()
1048 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1049 "not a regular file"):
1050 self.run_loop(self.loop._sock_sendfile_native(sock, f,
1051 0, None))
1052 self.assertEqual(self.file.tell(), 0)
1053
1054 def test_sock_sendfile_iobuffer(self):
1055 sock, proto = self.prepare()
1056 f = io.BytesIO()
1057 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1058 "not a regular file"):
1059 self.run_loop(self.loop._sock_sendfile_native(sock, f,
1060 0, None))
1061 self.assertEqual(self.file.tell(), 0)
1062
1063 def test_sock_sendfile_not_regular_file(self):
1064 sock, proto = self.prepare()
1065 f = mock.Mock()
1066 f.fileno.return_value = -1
1067 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1068 "not a regular file"):
1069 self.run_loop(self.loop._sock_sendfile_native(sock, f,
1070 0, None))
1071 self.assertEqual(self.file.tell(), 0)
1072
1073
1074 if __name__ == '__main__':
1075 unittest.main()