python (3.12.0)
1 """Tests for selector_events.py"""
2
3 import collections
4 import selectors
5 import socket
6 import sys
7 import unittest
8 from asyncio import selector_events
9 from unittest import mock
10
11 try:
12 import ssl
13 except ImportError:
14 ssl = None
15
16 import asyncio
17 from asyncio.selector_events import (BaseSelectorEventLoop,
18 _SelectorDatagramTransport,
19 _SelectorSocketTransport,
20 _SelectorTransport)
21 from test.test_asyncio import utils as test_utils
22
23 MOCK_ANY = mock.ANY
24
25
26 def tearDownModule():
27 asyncio.set_event_loop_policy(None)
28
29
30 class ESC[4;38;5;81mTestBaseSelectorEventLoop(ESC[4;38;5;149mBaseSelectorEventLoop):
31
32 def _make_self_pipe(self):
33 self._ssock = mock.Mock()
34 self._csock = mock.Mock()
35 self._internal_fds += 1
36
37 def _close_self_pipe(self):
38 pass
39
40
41 def list_to_buffer(l=()):
42 buffer = collections.deque()
43 buffer.extend((memoryview(i) for i in l))
44 return buffer
45
46
47
48 def close_transport(transport):
49 # Don't call transport.close() because the event loop and the selector
50 # are mocked
51 if transport._sock is None:
52 return
53 transport._sock.close()
54 transport._sock = None
55
56
57 class ESC[4;38;5;81mBaseSelectorEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
58
59 def setUp(self):
60 super().setUp()
61 self.selector = mock.Mock()
62 self.selector.select.return_value = []
63 self.loop = TestBaseSelectorEventLoop(self.selector)
64 self.set_event_loop(self.loop)
65
66 def test_make_socket_transport(self):
67 m = mock.Mock()
68 self.loop.add_reader = mock.Mock()
69 self.loop._ensure_fd_no_transport = mock.Mock()
70 transport = self.loop._make_socket_transport(m, asyncio.Protocol())
71 self.assertIsInstance(transport, _SelectorSocketTransport)
72 self.assertEqual(self.loop._ensure_fd_no_transport.call_count, 1)
73
74 # Calling repr() must not fail when the event loop is closed
75 self.loop.close()
76 repr(transport)
77
78 close_transport(transport)
79
80 @mock.patch('asyncio.selector_events.ssl', None)
81 @mock.patch('asyncio.sslproto.ssl', None)
82 def test_make_ssl_transport_without_ssl_error(self):
83 m = mock.Mock()
84 self.loop.add_reader = mock.Mock()
85 self.loop.add_writer = mock.Mock()
86 self.loop.remove_reader = mock.Mock()
87 self.loop.remove_writer = mock.Mock()
88 self.loop._ensure_fd_no_transport = mock.Mock()
89 with self.assertRaises(RuntimeError):
90 self.loop._make_ssl_transport(m, m, m, m)
91 self.assertEqual(self.loop._ensure_fd_no_transport.call_count, 1)
92
93 def test_close(self):
94 class ESC[4;38;5;81mEventLoop(ESC[4;38;5;149mBaseSelectorEventLoop):
95 def _make_self_pipe(self):
96 self._ssock = mock.Mock()
97 self._csock = mock.Mock()
98 self._internal_fds += 1
99
100 self.loop = EventLoop(self.selector)
101 self.set_event_loop(self.loop)
102
103 ssock = self.loop._ssock
104 ssock.fileno.return_value = 7
105 csock = self.loop._csock
106 csock.fileno.return_value = 1
107 remove_reader = self.loop._remove_reader = mock.Mock()
108
109 self.loop._selector.close()
110 self.loop._selector = selector = mock.Mock()
111 self.assertFalse(self.loop.is_closed())
112
113 self.loop.close()
114 self.assertTrue(self.loop.is_closed())
115 self.assertIsNone(self.loop._selector)
116 self.assertIsNone(self.loop._csock)
117 self.assertIsNone(self.loop._ssock)
118 selector.close.assert_called_with()
119 ssock.close.assert_called_with()
120 csock.close.assert_called_with()
121 remove_reader.assert_called_with(7)
122
123 # it should be possible to call close() more than once
124 self.loop.close()
125 self.loop.close()
126
127 # operation blocked when the loop is closed
128 f = self.loop.create_future()
129 self.assertRaises(RuntimeError, self.loop.run_forever)
130 self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
131 fd = 0
132 def callback():
133 pass
134 self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
135 self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
136
137 def test_close_no_selector(self):
138 self.loop.remove_reader = mock.Mock()
139 self.loop._selector.close()
140 self.loop._selector = None
141 self.loop.close()
142 self.assertIsNone(self.loop._selector)
143
144 def test_read_from_self_tryagain(self):
145 self.loop._ssock.recv.side_effect = BlockingIOError
146 self.assertIsNone(self.loop._read_from_self())
147
148 def test_read_from_self_exception(self):
149 self.loop._ssock.recv.side_effect = OSError
150 self.assertRaises(OSError, self.loop._read_from_self)
151
152 def test_write_to_self_tryagain(self):
153 self.loop._csock.send.side_effect = BlockingIOError
154 with test_utils.disable_logger():
155 self.assertIsNone(self.loop._write_to_self())
156
157 def test_write_to_self_exception(self):
158 # _write_to_self() swallows OSError
159 self.loop._csock.send.side_effect = RuntimeError()
160 self.assertRaises(RuntimeError, self.loop._write_to_self)
161
162 @mock.patch('socket.getaddrinfo')
163 def test_sock_connect_resolve_using_socket_params(self, m_gai):
164 addr = ('need-resolution.com', 8080)
165 for sock_type in [socket.SOCK_STREAM, socket.SOCK_DGRAM]:
166 with self.subTest(sock_type):
167 sock = test_utils.mock_nonblocking_socket(type=sock_type)
168
169 m_gai.side_effect = \
170 lambda *args: [(None, None, None, None, ('127.0.0.1', 0))]
171
172 con = self.loop.create_task(self.loop.sock_connect(sock, addr))
173 self.loop.run_until_complete(con)
174 m_gai.assert_called_with(
175 addr[0], addr[1], sock.family, sock.type, sock.proto, 0)
176
177 self.loop.run_until_complete(con)
178 sock.connect.assert_called_with(('127.0.0.1', 0))
179
180 def test_add_reader(self):
181 self.loop._selector.get_key.side_effect = KeyError
182 cb = lambda: True
183 self.loop.add_reader(1, cb)
184
185 self.assertTrue(self.loop._selector.register.called)
186 fd, mask, (r, w) = self.loop._selector.register.call_args[0]
187 self.assertEqual(1, fd)
188 self.assertEqual(selectors.EVENT_READ, mask)
189 self.assertEqual(cb, r._callback)
190 self.assertIsNone(w)
191
192 def test_add_reader_existing(self):
193 reader = mock.Mock()
194 writer = mock.Mock()
195 self.loop._selector.get_key.return_value = selectors.SelectorKey(
196 1, 1, selectors.EVENT_WRITE, (reader, writer))
197 cb = lambda: True
198 self.loop.add_reader(1, cb)
199
200 self.assertTrue(reader.cancel.called)
201 self.assertFalse(self.loop._selector.register.called)
202 self.assertTrue(self.loop._selector.modify.called)
203 fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
204 self.assertEqual(1, fd)
205 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
206 self.assertEqual(cb, r._callback)
207 self.assertEqual(writer, w)
208
209 def test_add_reader_existing_writer(self):
210 writer = mock.Mock()
211 self.loop._selector.get_key.return_value = selectors.SelectorKey(
212 1, 1, selectors.EVENT_WRITE, (None, writer))
213 cb = lambda: True
214 self.loop.add_reader(1, cb)
215
216 self.assertFalse(self.loop._selector.register.called)
217 self.assertTrue(self.loop._selector.modify.called)
218 fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
219 self.assertEqual(1, fd)
220 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
221 self.assertEqual(cb, r._callback)
222 self.assertEqual(writer, w)
223
224 def test_remove_reader(self):
225 self.loop._selector.get_key.return_value = selectors.SelectorKey(
226 1, 1, selectors.EVENT_READ, (None, None))
227 self.assertFalse(self.loop.remove_reader(1))
228
229 self.assertTrue(self.loop._selector.unregister.called)
230
231 def test_remove_reader_read_write(self):
232 reader = mock.Mock()
233 writer = mock.Mock()
234 self.loop._selector.get_key.return_value = selectors.SelectorKey(
235 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
236 (reader, writer))
237 self.assertTrue(
238 self.loop.remove_reader(1))
239
240 self.assertFalse(self.loop._selector.unregister.called)
241 self.assertEqual(
242 (1, selectors.EVENT_WRITE, (None, writer)),
243 self.loop._selector.modify.call_args[0])
244
245 def test_remove_reader_unknown(self):
246 self.loop._selector.get_key.side_effect = KeyError
247 self.assertFalse(
248 self.loop.remove_reader(1))
249
250 def test_add_writer(self):
251 self.loop._selector.get_key.side_effect = KeyError
252 cb = lambda: True
253 self.loop.add_writer(1, cb)
254
255 self.assertTrue(self.loop._selector.register.called)
256 fd, mask, (r, w) = self.loop._selector.register.call_args[0]
257 self.assertEqual(1, fd)
258 self.assertEqual(selectors.EVENT_WRITE, mask)
259 self.assertIsNone(r)
260 self.assertEqual(cb, w._callback)
261
262 def test_add_writer_existing(self):
263 reader = mock.Mock()
264 writer = mock.Mock()
265 self.loop._selector.get_key.return_value = selectors.SelectorKey(
266 1, 1, selectors.EVENT_READ, (reader, writer))
267 cb = lambda: True
268 self.loop.add_writer(1, cb)
269
270 self.assertTrue(writer.cancel.called)
271 self.assertFalse(self.loop._selector.register.called)
272 self.assertTrue(self.loop._selector.modify.called)
273 fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
274 self.assertEqual(1, fd)
275 self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
276 self.assertEqual(reader, r)
277 self.assertEqual(cb, w._callback)
278
279 def test_remove_writer(self):
280 self.loop._selector.get_key.return_value = selectors.SelectorKey(
281 1, 1, selectors.EVENT_WRITE, (None, None))
282 self.assertFalse(self.loop.remove_writer(1))
283
284 self.assertTrue(self.loop._selector.unregister.called)
285
286 def test_remove_writer_read_write(self):
287 reader = mock.Mock()
288 writer = mock.Mock()
289 self.loop._selector.get_key.return_value = selectors.SelectorKey(
290 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
291 (reader, writer))
292 self.assertTrue(
293 self.loop.remove_writer(1))
294
295 self.assertFalse(self.loop._selector.unregister.called)
296 self.assertEqual(
297 (1, selectors.EVENT_READ, (reader, None)),
298 self.loop._selector.modify.call_args[0])
299
300 def test_remove_writer_unknown(self):
301 self.loop._selector.get_key.side_effect = KeyError
302 self.assertFalse(
303 self.loop.remove_writer(1))
304
305 def test_process_events_read(self):
306 reader = mock.Mock()
307 reader._cancelled = False
308
309 self.loop._add_callback = mock.Mock()
310 self.loop._process_events(
311 [(selectors.SelectorKey(
312 1, 1, selectors.EVENT_READ, (reader, None)),
313 selectors.EVENT_READ)])
314 self.assertTrue(self.loop._add_callback.called)
315 self.loop._add_callback.assert_called_with(reader)
316
317 def test_process_events_read_cancelled(self):
318 reader = mock.Mock()
319 reader.cancelled = True
320
321 self.loop._remove_reader = mock.Mock()
322 self.loop._process_events(
323 [(selectors.SelectorKey(
324 1, 1, selectors.EVENT_READ, (reader, None)),
325 selectors.EVENT_READ)])
326 self.loop._remove_reader.assert_called_with(1)
327
328 def test_process_events_write(self):
329 writer = mock.Mock()
330 writer._cancelled = False
331
332 self.loop._add_callback = mock.Mock()
333 self.loop._process_events(
334 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
335 (None, writer)),
336 selectors.EVENT_WRITE)])
337 self.loop._add_callback.assert_called_with(writer)
338
339 def test_process_events_write_cancelled(self):
340 writer = mock.Mock()
341 writer.cancelled = True
342 self.loop._remove_writer = mock.Mock()
343
344 self.loop._process_events(
345 [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
346 (None, writer)),
347 selectors.EVENT_WRITE)])
348 self.loop._remove_writer.assert_called_with(1)
349
350 def test_accept_connection_multiple(self):
351 sock = mock.Mock()
352 sock.accept.return_value = (mock.Mock(), mock.Mock())
353 backlog = 100
354 # Mock the coroutine generation for a connection to prevent
355 # warnings related to un-awaited coroutines. _accept_connection2
356 # is an async function that is patched with AsyncMock. create_task
357 # creates a task out of coroutine returned by AsyncMock, so use
358 # asyncio.sleep(0) to ensure created tasks are complete to avoid
359 # task pending warnings.
360 mock_obj = mock.patch.object
361 with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
362 self.loop._accept_connection(
363 mock.Mock(), sock, backlog=backlog)
364 self.loop.run_until_complete(asyncio.sleep(0))
365 self.assertEqual(sock.accept.call_count, backlog)
366
367
368 class ESC[4;38;5;81mSelectorTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
369
370 def setUp(self):
371 super().setUp()
372 self.loop = self.new_test_loop()
373 self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
374 self.sock = mock.Mock(socket.socket)
375 self.sock.fileno.return_value = 7
376
377 def create_transport(self):
378 transport = _SelectorTransport(self.loop, self.sock, self.protocol,
379 None)
380 self.addCleanup(close_transport, transport)
381 return transport
382
383 def test_ctor(self):
384 tr = self.create_transport()
385 self.assertIs(tr._loop, self.loop)
386 self.assertIs(tr._sock, self.sock)
387 self.assertIs(tr._sock_fd, 7)
388
389 def test_abort(self):
390 tr = self.create_transport()
391 tr._force_close = mock.Mock()
392
393 tr.abort()
394 tr._force_close.assert_called_with(None)
395
396 def test_close(self):
397 tr = self.create_transport()
398 tr.close()
399
400 self.assertTrue(tr.is_closing())
401 self.assertEqual(1, self.loop.remove_reader_count[7])
402 self.protocol.connection_lost(None)
403 self.assertEqual(tr._conn_lost, 1)
404
405 tr.close()
406 self.assertEqual(tr._conn_lost, 1)
407 self.assertEqual(1, self.loop.remove_reader_count[7])
408
409 def test_close_write_buffer(self):
410 tr = self.create_transport()
411 tr._buffer.extend(b'data')
412 tr.close()
413
414 self.assertFalse(self.loop.readers)
415 test_utils.run_briefly(self.loop)
416 self.assertFalse(self.protocol.connection_lost.called)
417
418 def test_force_close(self):
419 tr = self.create_transport()
420 tr._buffer.extend(b'1')
421 self.loop._add_reader(7, mock.sentinel)
422 self.loop._add_writer(7, mock.sentinel)
423 tr._force_close(None)
424
425 self.assertTrue(tr.is_closing())
426 self.assertEqual(tr._buffer, list_to_buffer())
427 self.assertFalse(self.loop.readers)
428 self.assertFalse(self.loop.writers)
429
430 # second close should not remove reader
431 tr._force_close(None)
432 self.assertFalse(self.loop.readers)
433 self.assertEqual(1, self.loop.remove_reader_count[7])
434
435 @mock.patch('asyncio.log.logger.error')
436 def test_fatal_error(self, m_exc):
437 exc = OSError()
438 tr = self.create_transport()
439 tr._force_close = mock.Mock()
440 tr._fatal_error(exc)
441
442 m_exc.assert_not_called()
443
444 tr._force_close.assert_called_with(exc)
445
446 @mock.patch('asyncio.log.logger.error')
447 def test_fatal_error_custom_exception(self, m_exc):
448 class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
449 pass
450 exc = MyError()
451 tr = self.create_transport()
452 tr._force_close = mock.Mock()
453 tr._fatal_error(exc)
454
455 m_exc.assert_called_with(
456 test_utils.MockPattern(
457 'Fatal error on transport\nprotocol:.*\ntransport:.*'),
458 exc_info=(MyError, MOCK_ANY, MOCK_ANY))
459
460 tr._force_close.assert_called_with(exc)
461
462 def test_connection_lost(self):
463 exc = OSError()
464 tr = self.create_transport()
465 self.assertIsNotNone(tr._protocol)
466 self.assertIsNotNone(tr._loop)
467 tr._call_connection_lost(exc)
468
469 self.protocol.connection_lost.assert_called_with(exc)
470 self.sock.close.assert_called_with()
471 self.assertIsNone(tr._sock)
472
473 self.assertIsNone(tr._protocol)
474 self.assertIsNone(tr._loop)
475
476 def test__add_reader(self):
477 tr = self.create_transport()
478 tr._buffer.extend(b'1')
479 tr._add_reader(7, mock.sentinel)
480 self.assertTrue(self.loop.readers)
481
482 tr._force_close(None)
483
484 self.assertTrue(tr.is_closing())
485 self.assertFalse(self.loop.readers)
486
487 # can not add readers after closing
488 tr._add_reader(7, mock.sentinel)
489 self.assertFalse(self.loop.readers)
490
491
492 class ESC[4;38;5;81mSelectorSocketTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
493
494 def setUp(self):
495 super().setUp()
496 self.loop = self.new_test_loop()
497 self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
498 self.sock = mock.Mock(socket.socket)
499 self.sock_fd = self.sock.fileno.return_value = 7
500
501 def socket_transport(self, waiter=None, sendmsg=False):
502 transport = _SelectorSocketTransport(self.loop, self.sock,
503 self.protocol, waiter=waiter)
504 if sendmsg:
505 transport._write_ready = transport._write_sendmsg
506 else:
507 transport._write_ready = transport._write_send
508 self.addCleanup(close_transport, transport)
509 return transport
510
511 def test_ctor(self):
512 waiter = self.loop.create_future()
513 tr = self.socket_transport(waiter=waiter)
514 self.loop.run_until_complete(waiter)
515
516 self.loop.assert_reader(7, tr._read_ready)
517 test_utils.run_briefly(self.loop)
518 self.protocol.connection_made.assert_called_with(tr)
519
520 def test_ctor_with_waiter(self):
521 waiter = self.loop.create_future()
522 self.socket_transport(waiter=waiter)
523 self.loop.run_until_complete(waiter)
524
525 self.assertIsNone(waiter.result())
526
527 def test_pause_resume_reading(self):
528 tr = self.socket_transport()
529 test_utils.run_briefly(self.loop)
530 self.assertFalse(tr._paused)
531 self.assertTrue(tr.is_reading())
532 self.loop.assert_reader(7, tr._read_ready)
533
534 tr.pause_reading()
535 tr.pause_reading()
536 self.assertTrue(tr._paused)
537 self.assertFalse(tr.is_reading())
538 self.loop.assert_no_reader(7)
539
540 tr.resume_reading()
541 tr.resume_reading()
542 self.assertFalse(tr._paused)
543 self.assertTrue(tr.is_reading())
544 self.loop.assert_reader(7, tr._read_ready)
545
546 tr.close()
547 self.assertFalse(tr.is_reading())
548 self.loop.assert_no_reader(7)
549
550 def test_pause_reading_connection_made(self):
551 tr = self.socket_transport()
552 self.protocol.connection_made.side_effect = lambda _: tr.pause_reading()
553 test_utils.run_briefly(self.loop)
554 self.assertFalse(tr.is_reading())
555 self.loop.assert_no_reader(7)
556
557 tr.resume_reading()
558 self.assertTrue(tr.is_reading())
559 self.loop.assert_reader(7, tr._read_ready)
560
561 tr.close()
562 self.assertFalse(tr.is_reading())
563 self.loop.assert_no_reader(7)
564
565
566 def test_read_eof_received_error(self):
567 transport = self.socket_transport()
568 transport.close = mock.Mock()
569 transport._fatal_error = mock.Mock()
570
571 self.loop.call_exception_handler = mock.Mock()
572
573 self.protocol.eof_received.side_effect = LookupError()
574
575 self.sock.recv.return_value = b''
576 transport._read_ready()
577
578 self.protocol.eof_received.assert_called_with()
579 self.assertTrue(transport._fatal_error.called)
580
581 def test_data_received_error(self):
582 transport = self.socket_transport()
583 transport._fatal_error = mock.Mock()
584
585 self.loop.call_exception_handler = mock.Mock()
586 self.protocol.data_received.side_effect = LookupError()
587
588 self.sock.recv.return_value = b'data'
589 transport._read_ready()
590
591 self.assertTrue(transport._fatal_error.called)
592 self.assertTrue(self.protocol.data_received.called)
593
594 def test_read_ready(self):
595 transport = self.socket_transport()
596
597 self.sock.recv.return_value = b'data'
598 transport._read_ready()
599
600 self.protocol.data_received.assert_called_with(b'data')
601
602 def test_read_ready_eof(self):
603 transport = self.socket_transport()
604 transport.close = mock.Mock()
605
606 self.sock.recv.return_value = b''
607 transport._read_ready()
608
609 self.protocol.eof_received.assert_called_with()
610 transport.close.assert_called_with()
611
612 def test_read_ready_eof_keep_open(self):
613 transport = self.socket_transport()
614 transport.close = mock.Mock()
615
616 self.sock.recv.return_value = b''
617 self.protocol.eof_received.return_value = True
618 transport._read_ready()
619
620 self.protocol.eof_received.assert_called_with()
621 self.assertFalse(transport.close.called)
622
623 @mock.patch('logging.exception')
624 def test_read_ready_tryagain(self, m_exc):
625 self.sock.recv.side_effect = BlockingIOError
626
627 transport = self.socket_transport()
628 transport._fatal_error = mock.Mock()
629 transport._read_ready()
630
631 self.assertFalse(transport._fatal_error.called)
632
633 @mock.patch('logging.exception')
634 def test_read_ready_tryagain_interrupted(self, m_exc):
635 self.sock.recv.side_effect = InterruptedError
636
637 transport = self.socket_transport()
638 transport._fatal_error = mock.Mock()
639 transport._read_ready()
640
641 self.assertFalse(transport._fatal_error.called)
642
643 @mock.patch('logging.exception')
644 def test_read_ready_conn_reset(self, m_exc):
645 err = self.sock.recv.side_effect = ConnectionResetError()
646
647 transport = self.socket_transport()
648 transport._force_close = mock.Mock()
649 with test_utils.disable_logger():
650 transport._read_ready()
651 transport._force_close.assert_called_with(err)
652
653 @mock.patch('logging.exception')
654 def test_read_ready_err(self, m_exc):
655 err = self.sock.recv.side_effect = OSError()
656
657 transport = self.socket_transport()
658 transport._fatal_error = mock.Mock()
659 transport._read_ready()
660
661 transport._fatal_error.assert_called_with(
662 err,
663 'Fatal read error on socket transport')
664
665 def test_write(self):
666 data = b'data'
667 self.sock.send.return_value = len(data)
668
669 transport = self.socket_transport()
670 transport.write(data)
671 self.sock.send.assert_called_with(data)
672
673 def test_write_bytearray(self):
674 data = bytearray(b'data')
675 self.sock.send.return_value = len(data)
676
677 transport = self.socket_transport()
678 transport.write(data)
679 self.sock.send.assert_called_with(data)
680 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.
681
682 def test_write_memoryview(self):
683 data = memoryview(b'data')
684 self.sock.send.return_value = len(data)
685
686 transport = self.socket_transport()
687 transport.write(data)
688 self.sock.send.assert_called_with(data)
689
690 def test_write_no_data(self):
691 transport = self.socket_transport()
692 transport._buffer.append(memoryview(b'data'))
693 transport.write(b'')
694 self.assertFalse(self.sock.send.called)
695 self.assertEqual(list_to_buffer([b'data']), transport._buffer)
696
697 def test_write_buffer(self):
698 transport = self.socket_transport()
699 transport._buffer.append(b'data1')
700 transport.write(b'data2')
701 self.assertFalse(self.sock.send.called)
702 self.assertEqual(list_to_buffer([b'data1', b'data2']),
703 transport._buffer)
704
705 def test_write_partial(self):
706 data = b'data'
707 self.sock.send.return_value = 2
708
709 transport = self.socket_transport()
710 transport.write(data)
711
712 self.loop.assert_writer(7, transport._write_ready)
713 self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
714
715 def test_write_partial_bytearray(self):
716 data = bytearray(b'data')
717 self.sock.send.return_value = 2
718
719 transport = self.socket_transport()
720 transport.write(data)
721
722 self.loop.assert_writer(7, transport._write_ready)
723 self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
724 self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.
725
726 def test_write_partial_memoryview(self):
727 data = memoryview(b'data')
728 self.sock.send.return_value = 2
729
730 transport = self.socket_transport()
731 transport.write(data)
732
733 self.loop.assert_writer(7, transport._write_ready)
734 self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
735
736 def test_write_partial_none(self):
737 data = b'data'
738 self.sock.send.return_value = 0
739 self.sock.fileno.return_value = 7
740
741 transport = self.socket_transport()
742 transport.write(data)
743
744 self.loop.assert_writer(7, transport._write_ready)
745 self.assertEqual(list_to_buffer([b'data']), transport._buffer)
746
747 def test_write_tryagain(self):
748 self.sock.send.side_effect = BlockingIOError
749
750 data = b'data'
751 transport = self.socket_transport()
752 transport.write(data)
753
754 self.loop.assert_writer(7, transport._write_ready)
755 self.assertEqual(list_to_buffer([b'data']), transport._buffer)
756
757 def test_write_sendmsg_no_data(self):
758 self.sock.sendmsg = mock.Mock()
759 self.sock.sendmsg.return_value = 0
760 transport = self.socket_transport(sendmsg=True)
761 transport._buffer.append(memoryview(b'data'))
762 transport.write(b'')
763 self.assertFalse(self.sock.sendmsg.called)
764 self.assertEqual(list_to_buffer([b'data']), transport._buffer)
765
766 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
767 def test_writelines_sendmsg_full(self):
768 data = memoryview(b'data')
769 self.sock.sendmsg = mock.Mock()
770 self.sock.sendmsg.return_value = len(data)
771
772 transport = self.socket_transport(sendmsg=True)
773 transport.writelines([data])
774 self.assertTrue(self.sock.sendmsg.called)
775 self.assertFalse(self.loop.writers)
776
777 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
778 def test_writelines_sendmsg_partial(self):
779 data = memoryview(b'data')
780 self.sock.sendmsg = mock.Mock()
781 self.sock.sendmsg.return_value = 2
782
783 transport = self.socket_transport(sendmsg=True)
784 transport.writelines([data])
785 self.assertTrue(self.sock.sendmsg.called)
786 self.assertTrue(self.loop.writers)
787
788 def test_writelines_send_full(self):
789 data = memoryview(b'data')
790 self.sock.send.return_value = len(data)
791 self.sock.send.fileno.return_value = 7
792
793 transport = self.socket_transport()
794 transport.writelines([data])
795 self.assertTrue(self.sock.send.called)
796 self.assertFalse(self.loop.writers)
797
798 def test_writelines_send_partial(self):
799 data = memoryview(b'data')
800 self.sock.send.return_value = 2
801 self.sock.send.fileno.return_value = 7
802
803 transport = self.socket_transport()
804 transport.writelines([data])
805 self.assertTrue(self.sock.send.called)
806 self.assertTrue(self.loop.writers)
807
808 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
809 def test_write_sendmsg_full(self):
810 data = memoryview(b'data')
811 self.sock.sendmsg = mock.Mock()
812 self.sock.sendmsg.return_value = len(data)
813
814 transport = self.socket_transport(sendmsg=True)
815 transport._buffer.append(data)
816 self.loop._add_writer(7, transport._write_ready)
817 transport._write_ready()
818 self.assertTrue(self.sock.sendmsg.called)
819 self.assertFalse(self.loop.writers)
820
821 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
822 def test_write_sendmsg_partial(self):
823
824 data = memoryview(b'data')
825 self.sock.sendmsg = mock.Mock()
826 # Sent partial data
827 self.sock.sendmsg.return_value = 2
828
829 transport = self.socket_transport(sendmsg=True)
830 transport._buffer.append(data)
831 self.loop._add_writer(7, transport._write_ready)
832 transport._write_ready()
833 self.assertTrue(self.sock.sendmsg.called)
834 self.assertTrue(self.loop.writers)
835 self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
836
837 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
838 def test_write_sendmsg_half_buffer(self):
839 data = [memoryview(b'data1'), memoryview(b'data2')]
840 self.sock.sendmsg = mock.Mock()
841 # Sent partial data
842 self.sock.sendmsg.return_value = 2
843
844 transport = self.socket_transport(sendmsg=True)
845 transport._buffer.extend(data)
846 self.loop._add_writer(7, transport._write_ready)
847 transport._write_ready()
848 self.assertTrue(self.sock.sendmsg.called)
849 self.assertTrue(self.loop.writers)
850 self.assertEqual(list_to_buffer([b'ta1', b'data2']), transport._buffer)
851
852 @unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg')
853 def test_write_sendmsg_OSError(self):
854 data = memoryview(b'data')
855 self.sock.sendmsg = mock.Mock()
856 err = self.sock.sendmsg.side_effect = OSError()
857
858 transport = self.socket_transport(sendmsg=True)
859 transport._fatal_error = mock.Mock()
860 transport._buffer.extend(data)
861 # Calls _fatal_error and clears the buffer
862 transport._write_ready()
863 self.assertTrue(self.sock.sendmsg.called)
864 self.assertFalse(self.loop.writers)
865 self.assertEqual(list_to_buffer([]), transport._buffer)
866 transport._fatal_error.assert_called_with(
867 err,
868 'Fatal write error on socket transport')
869
870 @mock.patch('asyncio.selector_events.logger')
871 def test_write_exception(self, m_log):
872 err = self.sock.send.side_effect = OSError()
873
874 data = b'data'
875 transport = self.socket_transport()
876 transport._fatal_error = mock.Mock()
877 transport.write(data)
878 transport._fatal_error.assert_called_with(
879 err,
880 'Fatal write error on socket transport')
881 transport._conn_lost = 1
882
883 self.sock.reset_mock()
884 transport.write(data)
885 self.assertFalse(self.sock.send.called)
886 self.assertEqual(transport._conn_lost, 2)
887 transport.write(data)
888 transport.write(data)
889 transport.write(data)
890 transport.write(data)
891 m_log.warning.assert_called_with('socket.send() raised exception.')
892
893 def test_write_str(self):
894 transport = self.socket_transport()
895 self.assertRaises(TypeError, transport.write, 'str')
896
897 def test_write_closing(self):
898 transport = self.socket_transport()
899 transport.close()
900 self.assertEqual(transport._conn_lost, 1)
901 transport.write(b'data')
902 self.assertEqual(transport._conn_lost, 2)
903
904 def test_write_ready(self):
905 data = b'data'
906 self.sock.send.return_value = len(data)
907
908 transport = self.socket_transport()
909 transport._buffer.append(data)
910 self.loop._add_writer(7, transport._write_ready)
911 transport._write_ready()
912 self.assertTrue(self.sock.send.called)
913 self.assertFalse(self.loop.writers)
914
915 def test_write_ready_closing(self):
916 data = memoryview(b'data')
917 self.sock.send.return_value = len(data)
918
919 transport = self.socket_transport()
920 transport._closing = True
921 transport._buffer.append(data)
922 self.loop._add_writer(7, transport._write_ready)
923 transport._write_ready()
924 self.assertTrue(self.sock.send.called)
925 self.assertFalse(self.loop.writers)
926 self.sock.close.assert_called_with()
927 self.protocol.connection_lost.assert_called_with(None)
928
929 @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode")
930 def test_write_ready_no_data(self):
931 transport = self.socket_transport()
932 # This is an internal error.
933 self.assertRaises(AssertionError, transport._write_ready)
934
935 def test_write_ready_partial(self):
936 data = memoryview(b'data')
937 self.sock.send.return_value = 2
938
939 transport = self.socket_transport()
940 transport._buffer.append(data)
941 self.loop._add_writer(7, transport._write_ready)
942 transport._write_ready()
943 self.loop.assert_writer(7, transport._write_ready)
944 self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
945
946 def test_write_ready_partial_none(self):
947 data = b'data'
948 self.sock.send.return_value = 0
949
950 transport = self.socket_transport()
951 transport._buffer.append(data)
952 self.loop._add_writer(7, transport._write_ready)
953 transport._write_ready()
954 self.loop.assert_writer(7, transport._write_ready)
955 self.assertEqual(list_to_buffer([b'data']), transport._buffer)
956
957 def test_write_ready_tryagain(self):
958 self.sock.send.side_effect = BlockingIOError
959
960 transport = self.socket_transport()
961 buffer = list_to_buffer([b'data1', b'data2'])
962 transport._buffer = buffer
963 self.loop._add_writer(7, transport._write_ready)
964 transport._write_ready()
965
966 self.loop.assert_writer(7, transport._write_ready)
967 self.assertEqual(buffer, transport._buffer)
968
969 def test_write_ready_exception(self):
970 err = self.sock.send.side_effect = OSError()
971
972 transport = self.socket_transport()
973 transport._fatal_error = mock.Mock()
974 transport._buffer.extend(b'data')
975 transport._write_ready()
976 transport._fatal_error.assert_called_with(
977 err,
978 'Fatal write error on socket transport')
979
980 def test_write_eof(self):
981 tr = self.socket_transport()
982 self.assertTrue(tr.can_write_eof())
983 tr.write_eof()
984 self.sock.shutdown.assert_called_with(socket.SHUT_WR)
985 tr.write_eof()
986 self.assertEqual(self.sock.shutdown.call_count, 1)
987 tr.close()
988
989 def test_write_eof_buffer(self):
990 tr = self.socket_transport()
991 self.sock.send.side_effect = BlockingIOError
992 tr.write(b'data')
993 tr.write_eof()
994 self.assertEqual(tr._buffer, list_to_buffer([b'data']))
995 self.assertTrue(tr._eof)
996 self.assertFalse(self.sock.shutdown.called)
997 self.sock.send.side_effect = lambda _: 4
998 tr._write_ready()
999 self.assertTrue(self.sock.send.called)
1000 self.sock.shutdown.assert_called_with(socket.SHUT_WR)
1001 tr.close()
1002
1003 def test_write_eof_after_close(self):
1004 tr = self.socket_transport()
1005 tr.close()
1006 self.loop.run_until_complete(asyncio.sleep(0))
1007 tr.write_eof()
1008
1009 @mock.patch('asyncio.base_events.logger')
1010 def test_transport_close_remove_writer(self, m_log):
1011 remove_writer = self.loop._remove_writer = mock.Mock()
1012
1013 transport = self.socket_transport()
1014 transport.close()
1015 remove_writer.assert_called_with(self.sock_fd)
1016
1017
1018 class ESC[4;38;5;81mSelectorSocketTransportBufferedProtocolTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1019
1020 def setUp(self):
1021 super().setUp()
1022 self.loop = self.new_test_loop()
1023
1024 self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
1025 self.buf = bytearray(1)
1026 self.protocol.get_buffer.side_effect = lambda hint: self.buf
1027
1028 self.sock = mock.Mock(socket.socket)
1029 self.sock_fd = self.sock.fileno.return_value = 7
1030
1031 def socket_transport(self, waiter=None):
1032 transport = _SelectorSocketTransport(self.loop, self.sock,
1033 self.protocol, waiter=waiter)
1034 self.addCleanup(close_transport, transport)
1035 return transport
1036
1037 def test_ctor(self):
1038 waiter = self.loop.create_future()
1039 tr = self.socket_transport(waiter=waiter)
1040 self.loop.run_until_complete(waiter)
1041
1042 self.loop.assert_reader(7, tr._read_ready)
1043 test_utils.run_briefly(self.loop)
1044 self.protocol.connection_made.assert_called_with(tr)
1045
1046 def test_get_buffer_error(self):
1047 transport = self.socket_transport()
1048 transport._fatal_error = mock.Mock()
1049
1050 self.loop.call_exception_handler = mock.Mock()
1051 self.protocol.get_buffer.side_effect = LookupError()
1052
1053 transport._read_ready()
1054
1055 self.assertTrue(transport._fatal_error.called)
1056 self.assertTrue(self.protocol.get_buffer.called)
1057 self.assertFalse(self.protocol.buffer_updated.called)
1058
1059 def test_get_buffer_zerosized(self):
1060 transport = self.socket_transport()
1061 transport._fatal_error = mock.Mock()
1062
1063 self.loop.call_exception_handler = mock.Mock()
1064 self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
1065
1066 transport._read_ready()
1067
1068 self.assertTrue(transport._fatal_error.called)
1069 self.assertTrue(self.protocol.get_buffer.called)
1070 self.assertFalse(self.protocol.buffer_updated.called)
1071
1072 def test_proto_type_switch(self):
1073 self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
1074 transport = self.socket_transport()
1075
1076 self.sock.recv.return_value = b'data'
1077 transport._read_ready()
1078
1079 self.protocol.data_received.assert_called_with(b'data')
1080
1081 # switch protocol to a BufferedProtocol
1082
1083 buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
1084 buf = bytearray(4)
1085 buf_proto.get_buffer.side_effect = lambda hint: buf
1086
1087 transport.set_protocol(buf_proto)
1088
1089 self.sock.recv_into.return_value = 10
1090 transport._read_ready()
1091
1092 buf_proto.get_buffer.assert_called_with(-1)
1093 buf_proto.buffer_updated.assert_called_with(10)
1094
1095 def test_buffer_updated_error(self):
1096 transport = self.socket_transport()
1097 transport._fatal_error = mock.Mock()
1098
1099 self.loop.call_exception_handler = mock.Mock()
1100 self.protocol.buffer_updated.side_effect = LookupError()
1101
1102 self.sock.recv_into.return_value = 10
1103 transport._read_ready()
1104
1105 self.assertTrue(transport._fatal_error.called)
1106 self.assertTrue(self.protocol.get_buffer.called)
1107 self.assertTrue(self.protocol.buffer_updated.called)
1108
1109 def test_read_eof_received_error(self):
1110 transport = self.socket_transport()
1111 transport.close = mock.Mock()
1112 transport._fatal_error = mock.Mock()
1113
1114 self.loop.call_exception_handler = mock.Mock()
1115
1116 self.protocol.eof_received.side_effect = LookupError()
1117
1118 self.sock.recv_into.return_value = 0
1119 transport._read_ready()
1120
1121 self.protocol.eof_received.assert_called_with()
1122 self.assertTrue(transport._fatal_error.called)
1123
1124 def test_read_ready(self):
1125 transport = self.socket_transport()
1126
1127 self.sock.recv_into.return_value = 10
1128 transport._read_ready()
1129
1130 self.protocol.get_buffer.assert_called_with(-1)
1131 self.protocol.buffer_updated.assert_called_with(10)
1132
1133 def test_read_ready_eof(self):
1134 transport = self.socket_transport()
1135 transport.close = mock.Mock()
1136
1137 self.sock.recv_into.return_value = 0
1138 transport._read_ready()
1139
1140 self.protocol.eof_received.assert_called_with()
1141 transport.close.assert_called_with()
1142
1143 def test_read_ready_eof_keep_open(self):
1144 transport = self.socket_transport()
1145 transport.close = mock.Mock()
1146
1147 self.sock.recv_into.return_value = 0
1148 self.protocol.eof_received.return_value = True
1149 transport._read_ready()
1150
1151 self.protocol.eof_received.assert_called_with()
1152 self.assertFalse(transport.close.called)
1153
1154 @mock.patch('logging.exception')
1155 def test_read_ready_tryagain(self, m_exc):
1156 self.sock.recv_into.side_effect = BlockingIOError
1157
1158 transport = self.socket_transport()
1159 transport._fatal_error = mock.Mock()
1160 transport._read_ready()
1161
1162 self.assertFalse(transport._fatal_error.called)
1163
1164 @mock.patch('logging.exception')
1165 def test_read_ready_tryagain_interrupted(self, m_exc):
1166 self.sock.recv_into.side_effect = InterruptedError
1167
1168 transport = self.socket_transport()
1169 transport._fatal_error = mock.Mock()
1170 transport._read_ready()
1171
1172 self.assertFalse(transport._fatal_error.called)
1173
1174 @mock.patch('logging.exception')
1175 def test_read_ready_conn_reset(self, m_exc):
1176 err = self.sock.recv_into.side_effect = ConnectionResetError()
1177
1178 transport = self.socket_transport()
1179 transport._force_close = mock.Mock()
1180 with test_utils.disable_logger():
1181 transport._read_ready()
1182 transport._force_close.assert_called_with(err)
1183
1184 @mock.patch('logging.exception')
1185 def test_read_ready_err(self, m_exc):
1186 err = self.sock.recv_into.side_effect = OSError()
1187
1188 transport = self.socket_transport()
1189 transport._fatal_error = mock.Mock()
1190 transport._read_ready()
1191
1192 transport._fatal_error.assert_called_with(
1193 err,
1194 'Fatal read error on socket transport')
1195
1196
1197 class ESC[4;38;5;81mSelectorDatagramTransportTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1198
1199 def setUp(self):
1200 super().setUp()
1201 self.loop = self.new_test_loop()
1202 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
1203 self.sock = mock.Mock(spec_set=socket.socket)
1204 self.sock.fileno.return_value = 7
1205
1206 def datagram_transport(self, address=None):
1207 self.sock.getpeername.side_effect = None if address else OSError
1208 transport = _SelectorDatagramTransport(self.loop, self.sock,
1209 self.protocol,
1210 address=address)
1211 self.addCleanup(close_transport, transport)
1212 return transport
1213
1214 def test_read_ready(self):
1215 transport = self.datagram_transport()
1216
1217 self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234))
1218 transport._read_ready()
1219
1220 self.protocol.datagram_received.assert_called_with(
1221 b'data', ('0.0.0.0', 1234))
1222
1223 def test_transport_inheritance(self):
1224 transport = self.datagram_transport()
1225 self.assertIsInstance(transport, asyncio.DatagramTransport)
1226
1227 def test_read_ready_tryagain(self):
1228 transport = self.datagram_transport()
1229
1230 self.sock.recvfrom.side_effect = BlockingIOError
1231 transport._fatal_error = mock.Mock()
1232 transport._read_ready()
1233
1234 self.assertFalse(transport._fatal_error.called)
1235
1236 def test_read_ready_err(self):
1237 transport = self.datagram_transport()
1238
1239 err = self.sock.recvfrom.side_effect = RuntimeError()
1240 transport._fatal_error = mock.Mock()
1241 transport._read_ready()
1242
1243 transport._fatal_error.assert_called_with(
1244 err,
1245 'Fatal read error on datagram transport')
1246
1247 def test_read_ready_oserr(self):
1248 transport = self.datagram_transport()
1249
1250 err = self.sock.recvfrom.side_effect = OSError()
1251 transport._fatal_error = mock.Mock()
1252 transport._read_ready()
1253
1254 self.assertFalse(transport._fatal_error.called)
1255 self.protocol.error_received.assert_called_with(err)
1256
1257 def test_sendto(self):
1258 data = b'data'
1259 transport = self.datagram_transport()
1260 transport.sendto(data, ('0.0.0.0', 1234))
1261 self.assertTrue(self.sock.sendto.called)
1262 self.assertEqual(
1263 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1264
1265 def test_sendto_bytearray(self):
1266 data = bytearray(b'data')
1267 transport = self.datagram_transport()
1268 transport.sendto(data, ('0.0.0.0', 1234))
1269 self.assertTrue(self.sock.sendto.called)
1270 self.assertEqual(
1271 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1272
1273 def test_sendto_memoryview(self):
1274 data = memoryview(b'data')
1275 transport = self.datagram_transport()
1276 transport.sendto(data, ('0.0.0.0', 1234))
1277 self.assertTrue(self.sock.sendto.called)
1278 self.assertEqual(
1279 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1280
1281 def test_sendto_no_data(self):
1282 transport = self.datagram_transport()
1283 transport._buffer.append((b'data', ('0.0.0.0', 12345)))
1284 transport.sendto(b'', ())
1285 self.assertFalse(self.sock.sendto.called)
1286 self.assertEqual(
1287 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1288
1289 def test_sendto_buffer(self):
1290 transport = self.datagram_transport()
1291 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1292 transport.sendto(b'data2', ('0.0.0.0', 12345))
1293 self.assertFalse(self.sock.sendto.called)
1294 self.assertEqual(
1295 [(b'data1', ('0.0.0.0', 12345)),
1296 (b'data2', ('0.0.0.0', 12345))],
1297 list(transport._buffer))
1298
1299 def test_sendto_buffer_bytearray(self):
1300 data2 = bytearray(b'data2')
1301 transport = self.datagram_transport()
1302 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1303 transport.sendto(data2, ('0.0.0.0', 12345))
1304 self.assertFalse(self.sock.sendto.called)
1305 self.assertEqual(
1306 [(b'data1', ('0.0.0.0', 12345)),
1307 (b'data2', ('0.0.0.0', 12345))],
1308 list(transport._buffer))
1309 self.assertIsInstance(transport._buffer[1][0], bytes)
1310
1311 def test_sendto_buffer_memoryview(self):
1312 data2 = memoryview(b'data2')
1313 transport = self.datagram_transport()
1314 transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1315 transport.sendto(data2, ('0.0.0.0', 12345))
1316 self.assertFalse(self.sock.sendto.called)
1317 self.assertEqual(
1318 [(b'data1', ('0.0.0.0', 12345)),
1319 (b'data2', ('0.0.0.0', 12345))],
1320 list(transport._buffer))
1321 self.assertIsInstance(transport._buffer[1][0], bytes)
1322
1323 def test_sendto_tryagain(self):
1324 data = b'data'
1325
1326 self.sock.sendto.side_effect = BlockingIOError
1327
1328 transport = self.datagram_transport()
1329 transport.sendto(data, ('0.0.0.0', 12345))
1330
1331 self.loop.assert_writer(7, transport._sendto_ready)
1332 self.assertEqual(
1333 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1334
1335 @mock.patch('asyncio.selector_events.logger')
1336 def test_sendto_exception(self, m_log):
1337 data = b'data'
1338 err = self.sock.sendto.side_effect = RuntimeError()
1339
1340 transport = self.datagram_transport()
1341 transport._fatal_error = mock.Mock()
1342 transport.sendto(data, ())
1343
1344 self.assertTrue(transport._fatal_error.called)
1345 transport._fatal_error.assert_called_with(
1346 err,
1347 'Fatal write error on datagram transport')
1348 transport._conn_lost = 1
1349
1350 transport._address = ('123',)
1351 transport.sendto(data)
1352 transport.sendto(data)
1353 transport.sendto(data)
1354 transport.sendto(data)
1355 transport.sendto(data)
1356 m_log.warning.assert_called_with('socket.send() raised exception.')
1357
1358 def test_sendto_error_received(self):
1359 data = b'data'
1360
1361 self.sock.sendto.side_effect = ConnectionRefusedError
1362
1363 transport = self.datagram_transport()
1364 transport._fatal_error = mock.Mock()
1365 transport.sendto(data, ())
1366
1367 self.assertEqual(transport._conn_lost, 0)
1368 self.assertFalse(transport._fatal_error.called)
1369
1370 def test_sendto_error_received_connected(self):
1371 data = b'data'
1372
1373 self.sock.send.side_effect = ConnectionRefusedError
1374
1375 transport = self.datagram_transport(address=('0.0.0.0', 1))
1376 transport._fatal_error = mock.Mock()
1377 transport.sendto(data)
1378
1379 self.assertFalse(transport._fatal_error.called)
1380 self.assertTrue(self.protocol.error_received.called)
1381
1382 def test_sendto_str(self):
1383 transport = self.datagram_transport()
1384 self.assertRaises(TypeError, transport.sendto, 'str', ())
1385
1386 def test_sendto_connected_addr(self):
1387 transport = self.datagram_transport(address=('0.0.0.0', 1))
1388 self.assertRaises(
1389 ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
1390
1391 def test_sendto_closing(self):
1392 transport = self.datagram_transport(address=(1,))
1393 transport.close()
1394 self.assertEqual(transport._conn_lost, 1)
1395 transport.sendto(b'data', (1,))
1396 self.assertEqual(transport._conn_lost, 2)
1397
1398 def test_sendto_ready(self):
1399 data = b'data'
1400 self.sock.sendto.return_value = len(data)
1401
1402 transport = self.datagram_transport()
1403 transport._buffer.append((data, ('0.0.0.0', 12345)))
1404 self.loop._add_writer(7, transport._sendto_ready)
1405 transport._sendto_ready()
1406 self.assertTrue(self.sock.sendto.called)
1407 self.assertEqual(
1408 self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1409 self.assertFalse(self.loop.writers)
1410
1411 def test_sendto_ready_closing(self):
1412 data = b'data'
1413 self.sock.send.return_value = len(data)
1414
1415 transport = self.datagram_transport()
1416 transport._closing = True
1417 transport._buffer.append((data, ()))
1418 self.loop._add_writer(7, transport._sendto_ready)
1419 transport._sendto_ready()
1420 self.sock.sendto.assert_called_with(data, ())
1421 self.assertFalse(self.loop.writers)
1422 self.sock.close.assert_called_with()
1423 self.protocol.connection_lost.assert_called_with(None)
1424
1425 def test_sendto_ready_no_data(self):
1426 transport = self.datagram_transport()
1427 self.loop._add_writer(7, transport._sendto_ready)
1428 transport._sendto_ready()
1429 self.assertFalse(self.sock.sendto.called)
1430 self.assertFalse(self.loop.writers)
1431
1432 def test_sendto_ready_tryagain(self):
1433 self.sock.sendto.side_effect = BlockingIOError
1434
1435 transport = self.datagram_transport()
1436 transport._buffer.extend([(b'data1', ()), (b'data2', ())])
1437 self.loop._add_writer(7, transport._sendto_ready)
1438 transport._sendto_ready()
1439
1440 self.loop.assert_writer(7, transport._sendto_ready)
1441 self.assertEqual(
1442 [(b'data1', ()), (b'data2', ())],
1443 list(transport._buffer))
1444
1445 def test_sendto_ready_exception(self):
1446 err = self.sock.sendto.side_effect = RuntimeError()
1447
1448 transport = self.datagram_transport()
1449 transport._fatal_error = mock.Mock()
1450 transport._buffer.append((b'data', ()))
1451 transport._sendto_ready()
1452
1453 transport._fatal_error.assert_called_with(
1454 err,
1455 'Fatal write error on datagram transport')
1456
1457 def test_sendto_ready_error_received(self):
1458 self.sock.sendto.side_effect = ConnectionRefusedError
1459
1460 transport = self.datagram_transport()
1461 transport._fatal_error = mock.Mock()
1462 transport._buffer.append((b'data', ()))
1463 transport._sendto_ready()
1464
1465 self.assertFalse(transport._fatal_error.called)
1466
1467 def test_sendto_ready_error_received_connection(self):
1468 self.sock.send.side_effect = ConnectionRefusedError
1469
1470 transport = self.datagram_transport(address=('0.0.0.0', 1))
1471 transport._fatal_error = mock.Mock()
1472 transport._buffer.append((b'data', ()))
1473 transport._sendto_ready()
1474
1475 self.assertFalse(transport._fatal_error.called)
1476 self.assertTrue(self.protocol.error_received.called)
1477
1478 @mock.patch('asyncio.base_events.logger.error')
1479 def test_fatal_error_connected(self, m_exc):
1480 transport = self.datagram_transport(address=('0.0.0.0', 1))
1481 err = ConnectionRefusedError()
1482 transport._fatal_error(err)
1483 self.assertFalse(self.protocol.error_received.called)
1484 m_exc.assert_not_called()
1485
1486 @mock.patch('asyncio.base_events.logger.error')
1487 def test_fatal_error_connected_custom_error(self, m_exc):
1488 class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
1489 pass
1490 transport = self.datagram_transport(address=('0.0.0.0', 1))
1491 err = MyException()
1492 transport._fatal_error(err)
1493 self.assertFalse(self.protocol.error_received.called)
1494 m_exc.assert_called_with(
1495 test_utils.MockPattern(
1496 'Fatal error on transport\nprotocol:.*\ntransport:.*'),
1497 exc_info=(MyException, MOCK_ANY, MOCK_ANY))
1498
1499
1500 if __name__ == '__main__':
1501 unittest.main()