python (3.12.0)
1 """Tests for base_events.py"""
2
3 import concurrent.futures
4 import errno
5 import math
6 import socket
7 import sys
8 import threading
9 import time
10 import unittest
11 from unittest import mock
12
13 import asyncio
14 from asyncio import base_events
15 from asyncio import constants
16 from test.test_asyncio import utils as test_utils
17 from test import support
18 from test.support.script_helper import assert_python_ok
19 from test.support import os_helper
20 from test.support import socket_helper
21 import warnings
22
23 MOCK_ANY = mock.ANY
24
25
26 def tearDownModule():
27 asyncio.set_event_loop_policy(None)
28
29
30 def mock_socket_module():
31 m_socket = mock.MagicMock(spec=socket)
32 for name in (
33 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
34 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
35 ):
36 if hasattr(socket, name):
37 setattr(m_socket, name, getattr(socket, name))
38 else:
39 delattr(m_socket, name)
40
41 m_socket.socket = mock.MagicMock()
42 m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
43
44 return m_socket
45
46
47 def patch_socket(f):
48 return mock.patch('asyncio.base_events.socket',
49 new_callable=mock_socket_module)(f)
50
51
52 class ESC[4;38;5;81mBaseEventTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
53
54 def test_ipaddr_info(self):
55 UNSPEC = socket.AF_UNSPEC
56 INET = socket.AF_INET
57 INET6 = socket.AF_INET6
58 STREAM = socket.SOCK_STREAM
59 DGRAM = socket.SOCK_DGRAM
60 TCP = socket.IPPROTO_TCP
61 UDP = socket.IPPROTO_UDP
62
63 self.assertEqual(
64 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
65 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
66
67 self.assertEqual(
68 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
69 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
70
71 self.assertEqual(
72 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
73 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
74
75 self.assertEqual(
76 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
77 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
78
79 # Socket type STREAM implies TCP protocol.
80 self.assertEqual(
81 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
82 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
83
84 # Socket type DGRAM implies UDP protocol.
85 self.assertEqual(
86 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
87 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
88
89 # No socket type.
90 self.assertIsNone(
91 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
92
93 if socket_helper.IPV6_ENABLED:
94 # IPv4 address with family IPv6.
95 self.assertIsNone(
96 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
97
98 self.assertEqual(
99 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
100 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
101
102 self.assertEqual(
103 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
104 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
105
106 # IPv6 address with family IPv4.
107 self.assertIsNone(
108 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
109
110 # IPv6 address with zone index.
111 self.assertIsNone(
112 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
113
114 def test_port_parameter_types(self):
115 # Test obscure kinds of arguments for "port".
116 INET = socket.AF_INET
117 STREAM = socket.SOCK_STREAM
118 TCP = socket.IPPROTO_TCP
119
120 self.assertEqual(
121 (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
122 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
123
124 self.assertEqual(
125 (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
126 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
127
128 self.assertEqual(
129 (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
130 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
131
132 self.assertEqual(
133 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
134 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
135
136 self.assertEqual(
137 (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
138 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
139
140 @patch_socket
141 def test_ipaddr_info_no_inet_pton(self, m_socket):
142 del m_socket.inet_pton
143 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
144 socket.AF_INET,
145 socket.SOCK_STREAM,
146 socket.IPPROTO_TCP))
147
148
149 class ESC[4;38;5;81mBaseEventLoopTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
150
151 def setUp(self):
152 super().setUp()
153 self.loop = base_events.BaseEventLoop()
154 self.loop._selector = mock.Mock()
155 self.loop._selector.select.return_value = ()
156 self.set_event_loop(self.loop)
157
158 def test_not_implemented(self):
159 m = mock.Mock()
160 self.assertRaises(
161 NotImplementedError,
162 self.loop._make_socket_transport, m, m)
163 self.assertRaises(
164 NotImplementedError,
165 self.loop._make_ssl_transport, m, m, m, m)
166 self.assertRaises(
167 NotImplementedError,
168 self.loop._make_datagram_transport, m, m)
169 self.assertRaises(
170 NotImplementedError, self.loop._process_events, [])
171 self.assertRaises(
172 NotImplementedError, self.loop._write_to_self)
173 self.assertRaises(
174 NotImplementedError,
175 self.loop._make_read_pipe_transport, m, m)
176 self.assertRaises(
177 NotImplementedError,
178 self.loop._make_write_pipe_transport, m, m)
179 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
180 with self.assertRaises(NotImplementedError):
181 gen.send(None)
182
183 def test_close(self):
184 self.assertFalse(self.loop.is_closed())
185 self.loop.close()
186 self.assertTrue(self.loop.is_closed())
187
188 # it should be possible to call close() more than once
189 self.loop.close()
190 self.loop.close()
191
192 # operation blocked when the loop is closed
193 f = self.loop.create_future()
194 self.assertRaises(RuntimeError, self.loop.run_forever)
195 self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
196
197 def test__add_callback_handle(self):
198 h = asyncio.Handle(lambda: False, (), self.loop, None)
199
200 self.loop._add_callback(h)
201 self.assertFalse(self.loop._scheduled)
202 self.assertIn(h, self.loop._ready)
203
204 def test__add_callback_cancelled_handle(self):
205 h = asyncio.Handle(lambda: False, (), self.loop, None)
206 h.cancel()
207
208 self.loop._add_callback(h)
209 self.assertFalse(self.loop._scheduled)
210 self.assertFalse(self.loop._ready)
211
212 def test_set_default_executor(self):
213 class ESC[4;38;5;81mDummyExecutor(ESC[4;38;5;149mconcurrentESC[4;38;5;149m.ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mThreadPoolExecutor):
214 def submit(self, fn, *args, **kwargs):
215 raise NotImplementedError(
216 'cannot submit into a dummy executor')
217
218 self.loop._process_events = mock.Mock()
219 self.loop._write_to_self = mock.Mock()
220
221 executor = DummyExecutor()
222 self.loop.set_default_executor(executor)
223 self.assertIs(executor, self.loop._default_executor)
224
225 def test_set_default_executor_error(self):
226 executor = mock.Mock()
227
228 msg = 'executor must be ThreadPoolExecutor instance'
229 with self.assertRaisesRegex(TypeError, msg):
230 self.loop.set_default_executor(executor)
231
232 self.assertIsNone(self.loop._default_executor)
233
234 def test_call_soon(self):
235 def cb():
236 pass
237
238 h = self.loop.call_soon(cb)
239 self.assertEqual(h._callback, cb)
240 self.assertIsInstance(h, asyncio.Handle)
241 self.assertIn(h, self.loop._ready)
242
243 def test_call_soon_non_callable(self):
244 self.loop.set_debug(True)
245 with self.assertRaisesRegex(TypeError, 'a callable object'):
246 self.loop.call_soon(1)
247
248 def test_call_later(self):
249 def cb():
250 pass
251
252 h = self.loop.call_later(10.0, cb)
253 self.assertIsInstance(h, asyncio.TimerHandle)
254 self.assertIn(h, self.loop._scheduled)
255 self.assertNotIn(h, self.loop._ready)
256 with self.assertRaises(TypeError, msg="delay must not be None"):
257 self.loop.call_later(None, cb)
258
259 def test_call_later_negative_delays(self):
260 calls = []
261
262 def cb(arg):
263 calls.append(arg)
264
265 self.loop._process_events = mock.Mock()
266 self.loop.call_later(-1, cb, 'a')
267 self.loop.call_later(-2, cb, 'b')
268 test_utils.run_briefly(self.loop)
269 self.assertEqual(calls, ['b', 'a'])
270
271 def test_time_and_call_at(self):
272 def cb():
273 self.loop.stop()
274
275 self.loop._process_events = mock.Mock()
276 delay = 0.1
277
278 when = self.loop.time() + delay
279 self.loop.call_at(when, cb)
280 t0 = self.loop.time()
281 self.loop.run_forever()
282 dt = self.loop.time() - t0
283
284 # 50 ms: maximum granularity of the event loop
285 self.assertGreaterEqual(dt, delay - 0.050, dt)
286 # tolerate a difference of +800 ms because some Python buildbots
287 # are really slow
288 self.assertLessEqual(dt, 0.9, dt)
289 with self.assertRaises(TypeError, msg="when cannot be None"):
290 self.loop.call_at(None, cb)
291
292 def check_thread(self, loop, debug):
293 def cb():
294 pass
295
296 loop.set_debug(debug)
297 if debug:
298 msg = ("Non-thread-safe operation invoked on an event loop other "
299 "than the current one")
300 with self.assertRaisesRegex(RuntimeError, msg):
301 loop.call_soon(cb)
302 with self.assertRaisesRegex(RuntimeError, msg):
303 loop.call_later(60, cb)
304 with self.assertRaisesRegex(RuntimeError, msg):
305 loop.call_at(loop.time() + 60, cb)
306 else:
307 loop.call_soon(cb)
308 loop.call_later(60, cb)
309 loop.call_at(loop.time() + 60, cb)
310
311 def test_check_thread(self):
312 def check_in_thread(loop, event, debug, create_loop, fut):
313 # wait until the event loop is running
314 event.wait()
315
316 try:
317 if create_loop:
318 loop2 = base_events.BaseEventLoop()
319 try:
320 asyncio.set_event_loop(loop2)
321 self.check_thread(loop, debug)
322 finally:
323 asyncio.set_event_loop(None)
324 loop2.close()
325 else:
326 self.check_thread(loop, debug)
327 except Exception as exc:
328 loop.call_soon_threadsafe(fut.set_exception, exc)
329 else:
330 loop.call_soon_threadsafe(fut.set_result, None)
331
332 def test_thread(loop, debug, create_loop=False):
333 event = threading.Event()
334 fut = loop.create_future()
335 loop.call_soon(event.set)
336 args = (loop, event, debug, create_loop, fut)
337 thread = threading.Thread(target=check_in_thread, args=args)
338 thread.start()
339 loop.run_until_complete(fut)
340 thread.join()
341
342 self.loop._process_events = mock.Mock()
343 self.loop._write_to_self = mock.Mock()
344
345 # raise RuntimeError if the thread has no event loop
346 test_thread(self.loop, True)
347
348 # check disabled if debug mode is disabled
349 test_thread(self.loop, False)
350
351 # raise RuntimeError if the event loop of the thread is not the called
352 # event loop
353 test_thread(self.loop, True, create_loop=True)
354
355 # check disabled if debug mode is disabled
356 test_thread(self.loop, False, create_loop=True)
357
358 def test__run_once(self):
359 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
360 self.loop, None)
361 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
362 self.loop, None)
363
364 h1.cancel()
365
366 self.loop._process_events = mock.Mock()
367 self.loop._scheduled.append(h1)
368 self.loop._scheduled.append(h2)
369 self.loop._run_once()
370
371 t = self.loop._selector.select.call_args[0][0]
372 self.assertTrue(9.5 < t < 10.5, t)
373 self.assertEqual([h2], self.loop._scheduled)
374 self.assertTrue(self.loop._process_events.called)
375
376 def test_set_debug(self):
377 self.loop.set_debug(True)
378 self.assertTrue(self.loop.get_debug())
379 self.loop.set_debug(False)
380 self.assertFalse(self.loop.get_debug())
381
382 def test__run_once_schedule_handle(self):
383 handle = None
384 processed = False
385
386 def cb(loop):
387 nonlocal processed, handle
388 processed = True
389 handle = loop.call_soon(lambda: True)
390
391 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
392 self.loop, None)
393
394 self.loop._process_events = mock.Mock()
395 self.loop._scheduled.append(h)
396 self.loop._run_once()
397
398 self.assertTrue(processed)
399 self.assertEqual([handle], list(self.loop._ready))
400
401 def test__run_once_cancelled_event_cleanup(self):
402 self.loop._process_events = mock.Mock()
403
404 self.assertTrue(
405 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
406
407 def cb():
408 pass
409
410 # Set up one "blocking" event that will not be cancelled to
411 # ensure later cancelled events do not make it to the head
412 # of the queue and get cleaned.
413 not_cancelled_count = 1
414 self.loop.call_later(3000, cb)
415
416 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
417 # cancelled handles, ensure they aren't removed
418
419 cancelled_count = 2
420 for x in range(2):
421 h = self.loop.call_later(3600, cb)
422 h.cancel()
423
424 # Add some cancelled events that will be at head and removed
425 cancelled_count += 2
426 for x in range(2):
427 h = self.loop.call_later(100, cb)
428 h.cancel()
429
430 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
431 self.assertLessEqual(cancelled_count + not_cancelled_count,
432 base_events._MIN_SCHEDULED_TIMER_HANDLES)
433
434 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
435
436 self.loop._run_once()
437
438 cancelled_count -= 2
439
440 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
441
442 self.assertEqual(len(self.loop._scheduled),
443 cancelled_count + not_cancelled_count)
444
445 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
446 # so that deletion of cancelled events will occur on next _run_once
447 add_cancel_count = int(math.ceil(
448 base_events._MIN_SCHEDULED_TIMER_HANDLES *
449 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
450
451 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
452 add_cancel_count, 0)
453
454 # Add some events that will not be cancelled
455 not_cancelled_count += add_not_cancel_count
456 for x in range(add_not_cancel_count):
457 self.loop.call_later(3600, cb)
458
459 # Add enough cancelled events
460 cancelled_count += add_cancel_count
461 for x in range(add_cancel_count):
462 h = self.loop.call_later(3600, cb)
463 h.cancel()
464
465 # Ensure all handles are still scheduled
466 self.assertEqual(len(self.loop._scheduled),
467 cancelled_count + not_cancelled_count)
468
469 self.loop._run_once()
470
471 # Ensure cancelled events were removed
472 self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
473
474 # Ensure only uncancelled events remain scheduled
475 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
476
477 def test_run_until_complete_type_error(self):
478 self.assertRaises(TypeError,
479 self.loop.run_until_complete, 'blah')
480
481 def test_run_until_complete_loop(self):
482 task = self.loop.create_future()
483 other_loop = self.new_test_loop()
484 self.addCleanup(other_loop.close)
485 self.assertRaises(ValueError,
486 other_loop.run_until_complete, task)
487
488 def test_run_until_complete_loop_orphan_future_close_loop(self):
489 class ESC[4;38;5;81mShowStopper(ESC[4;38;5;149mSystemExit):
490 pass
491
492 async def foo(delay):
493 await asyncio.sleep(delay)
494
495 def throw():
496 raise ShowStopper
497
498 self.loop._process_events = mock.Mock()
499 self.loop.call_soon(throw)
500 with self.assertRaises(ShowStopper):
501 self.loop.run_until_complete(foo(0.1))
502
503 # This call fails if run_until_complete does not clean up
504 # done-callback for the previous future.
505 self.loop.run_until_complete(foo(0.2))
506
507 def test_subprocess_exec_invalid_args(self):
508 args = [sys.executable, '-c', 'pass']
509
510 # missing program parameter (empty args)
511 self.assertRaises(TypeError,
512 self.loop.run_until_complete, self.loop.subprocess_exec,
513 asyncio.SubprocessProtocol)
514
515 # expected multiple arguments, not a list
516 self.assertRaises(TypeError,
517 self.loop.run_until_complete, self.loop.subprocess_exec,
518 asyncio.SubprocessProtocol, args)
519
520 # program arguments must be strings, not int
521 self.assertRaises(TypeError,
522 self.loop.run_until_complete, self.loop.subprocess_exec,
523 asyncio.SubprocessProtocol, sys.executable, 123)
524
525 # universal_newlines, shell, bufsize must not be set
526 self.assertRaises(TypeError,
527 self.loop.run_until_complete, self.loop.subprocess_exec,
528 asyncio.SubprocessProtocol, *args, universal_newlines=True)
529 self.assertRaises(TypeError,
530 self.loop.run_until_complete, self.loop.subprocess_exec,
531 asyncio.SubprocessProtocol, *args, shell=True)
532 self.assertRaises(TypeError,
533 self.loop.run_until_complete, self.loop.subprocess_exec,
534 asyncio.SubprocessProtocol, *args, bufsize=4096)
535
536 def test_subprocess_shell_invalid_args(self):
537 # expected a string, not an int or a list
538 self.assertRaises(TypeError,
539 self.loop.run_until_complete, self.loop.subprocess_shell,
540 asyncio.SubprocessProtocol, 123)
541 self.assertRaises(TypeError,
542 self.loop.run_until_complete, self.loop.subprocess_shell,
543 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
544
545 # universal_newlines, shell, bufsize must not be set
546 self.assertRaises(TypeError,
547 self.loop.run_until_complete, self.loop.subprocess_shell,
548 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
549 self.assertRaises(TypeError,
550 self.loop.run_until_complete, self.loop.subprocess_shell,
551 asyncio.SubprocessProtocol, 'exit 0', shell=True)
552 self.assertRaises(TypeError,
553 self.loop.run_until_complete, self.loop.subprocess_shell,
554 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
555
556 def test_default_exc_handler_callback(self):
557 self.loop._process_events = mock.Mock()
558
559 def zero_error(fut):
560 fut.set_result(True)
561 1/0
562
563 # Test call_soon (events.Handle)
564 with mock.patch('asyncio.base_events.logger') as log:
565 fut = self.loop.create_future()
566 self.loop.call_soon(zero_error, fut)
567 fut.add_done_callback(lambda fut: self.loop.stop())
568 self.loop.run_forever()
569 log.error.assert_called_with(
570 test_utils.MockPattern('Exception in callback.*zero'),
571 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
572
573 # Test call_later (events.TimerHandle)
574 with mock.patch('asyncio.base_events.logger') as log:
575 fut = self.loop.create_future()
576 self.loop.call_later(0.01, zero_error, fut)
577 fut.add_done_callback(lambda fut: self.loop.stop())
578 self.loop.run_forever()
579 log.error.assert_called_with(
580 test_utils.MockPattern('Exception in callback.*zero'),
581 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
582
583 def test_default_exc_handler_coro(self):
584 self.loop._process_events = mock.Mock()
585
586 async def zero_error_coro():
587 await asyncio.sleep(0.01)
588 1/0
589
590 # Test Future.__del__
591 with mock.patch('asyncio.base_events.logger') as log:
592 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
593 fut.add_done_callback(lambda *args: self.loop.stop())
594 self.loop.run_forever()
595 fut = None # Trigger Future.__del__ or futures._TracebackLogger
596 support.gc_collect()
597 # Future.__del__ in logs error with an actual exception context
598 log.error.assert_called_with(
599 test_utils.MockPattern('.*exception was never retrieved'),
600 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
601
602 def test_set_exc_handler_invalid(self):
603 with self.assertRaisesRegex(TypeError, 'A callable object or None'):
604 self.loop.set_exception_handler('spam')
605
606 def test_set_exc_handler_custom(self):
607 def zero_error():
608 1/0
609
610 def run_loop():
611 handle = self.loop.call_soon(zero_error)
612 self.loop._run_once()
613 return handle
614
615 self.loop.set_debug(True)
616 self.loop._process_events = mock.Mock()
617
618 self.assertIsNone(self.loop.get_exception_handler())
619 mock_handler = mock.Mock()
620 self.loop.set_exception_handler(mock_handler)
621 self.assertIs(self.loop.get_exception_handler(), mock_handler)
622 handle = run_loop()
623 mock_handler.assert_called_with(self.loop, {
624 'exception': MOCK_ANY,
625 'message': test_utils.MockPattern(
626 'Exception in callback.*zero_error'),
627 'handle': handle,
628 'source_traceback': handle._source_traceback,
629 })
630 mock_handler.reset_mock()
631
632 self.loop.set_exception_handler(None)
633 with mock.patch('asyncio.base_events.logger') as log:
634 run_loop()
635 log.error.assert_called_with(
636 test_utils.MockPattern(
637 'Exception in callback.*zero'),
638 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
639
640 self.assertFalse(mock_handler.called)
641
642 def test_set_exc_handler_broken(self):
643 def run_loop():
644 def zero_error():
645 1/0
646 self.loop.call_soon(zero_error)
647 self.loop._run_once()
648
649 def handler(loop, context):
650 raise AttributeError('spam')
651
652 self.loop._process_events = mock.Mock()
653
654 self.loop.set_exception_handler(handler)
655
656 with mock.patch('asyncio.base_events.logger') as log:
657 run_loop()
658 log.error.assert_called_with(
659 test_utils.MockPattern(
660 'Unhandled error in exception handler'),
661 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
662
663 def test_default_exc_handler_broken(self):
664 _context = None
665
666 class ESC[4;38;5;81mLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
667
668 _selector = mock.Mock()
669 _process_events = mock.Mock()
670
671 def default_exception_handler(self, context):
672 nonlocal _context
673 _context = context
674 # Simulates custom buggy "default_exception_handler"
675 raise ValueError('spam')
676
677 loop = Loop()
678 self.addCleanup(loop.close)
679 asyncio.set_event_loop(loop)
680
681 def run_loop():
682 def zero_error():
683 1/0
684 loop.call_soon(zero_error)
685 loop._run_once()
686
687 with mock.patch('asyncio.base_events.logger') as log:
688 run_loop()
689 log.error.assert_called_with(
690 'Exception in default exception handler',
691 exc_info=True)
692
693 def custom_handler(loop, context):
694 raise ValueError('ham')
695
696 _context = None
697 loop.set_exception_handler(custom_handler)
698 with mock.patch('asyncio.base_events.logger') as log:
699 run_loop()
700 log.error.assert_called_with(
701 test_utils.MockPattern('Exception in default exception.*'
702 'while handling.*in custom'),
703 exc_info=True)
704
705 # Check that original context was passed to default
706 # exception handler.
707 self.assertIn('context', _context)
708 self.assertIs(type(_context['context']['exception']),
709 ZeroDivisionError)
710
711 def test_set_task_factory_invalid(self):
712 with self.assertRaisesRegex(
713 TypeError, 'task factory must be a callable or None'):
714
715 self.loop.set_task_factory(1)
716
717 self.assertIsNone(self.loop.get_task_factory())
718
719 def test_set_task_factory(self):
720 self.loop._process_events = mock.Mock()
721
722 class ESC[4;38;5;81mMyTask(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTask):
723 pass
724
725 async def coro():
726 pass
727
728 factory = lambda loop, coro: MyTask(coro, loop=loop)
729
730 self.assertIsNone(self.loop.get_task_factory())
731 self.loop.set_task_factory(factory)
732 self.assertIs(self.loop.get_task_factory(), factory)
733
734 task = self.loop.create_task(coro())
735 self.assertTrue(isinstance(task, MyTask))
736 self.loop.run_until_complete(task)
737
738 self.loop.set_task_factory(None)
739 self.assertIsNone(self.loop.get_task_factory())
740
741 task = self.loop.create_task(coro())
742 self.assertTrue(isinstance(task, asyncio.Task))
743 self.assertFalse(isinstance(task, MyTask))
744 self.loop.run_until_complete(task)
745
746 def test_env_var_debug(self):
747 code = '\n'.join((
748 'import asyncio',
749 'loop = asyncio.new_event_loop()',
750 'print(loop.get_debug())'))
751
752 # Test with -E to not fail if the unit test was run with
753 # PYTHONASYNCIODEBUG set to a non-empty string
754 sts, stdout, stderr = assert_python_ok('-E', '-c', code)
755 self.assertEqual(stdout.rstrip(), b'False')
756
757 sts, stdout, stderr = assert_python_ok('-c', code,
758 PYTHONASYNCIODEBUG='',
759 PYTHONDEVMODE='')
760 self.assertEqual(stdout.rstrip(), b'False')
761
762 sts, stdout, stderr = assert_python_ok('-c', code,
763 PYTHONASYNCIODEBUG='1',
764 PYTHONDEVMODE='')
765 self.assertEqual(stdout.rstrip(), b'True')
766
767 sts, stdout, stderr = assert_python_ok('-E', '-c', code,
768 PYTHONASYNCIODEBUG='1')
769 self.assertEqual(stdout.rstrip(), b'False')
770
771 # -X dev
772 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
773 '-c', code)
774 self.assertEqual(stdout.rstrip(), b'True')
775
776 def test_create_task(self):
777 class ESC[4;38;5;81mMyTask(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mTask):
778 pass
779
780 async def test():
781 pass
782
783 class ESC[4;38;5;81mEventLoop(ESC[4;38;5;149mbase_eventsESC[4;38;5;149m.ESC[4;38;5;149mBaseEventLoop):
784 def create_task(self, coro):
785 return MyTask(coro, loop=loop)
786
787 loop = EventLoop()
788 self.set_event_loop(loop)
789
790 coro = test()
791 task = asyncio.ensure_future(coro, loop=loop)
792 self.assertIsInstance(task, MyTask)
793
794 # make warnings quiet
795 task._log_destroy_pending = False
796 coro.close()
797
798 def test_create_task_error_closes_coro(self):
799 async def test():
800 pass
801 loop = asyncio.new_event_loop()
802 loop.close()
803 with warnings.catch_warnings(record=True) as w:
804 with self.assertRaises(RuntimeError):
805 asyncio.ensure_future(test(), loop=loop)
806 self.assertEqual(len(w), 0)
807
808
809 def test_create_named_task_with_default_factory(self):
810 async def test():
811 pass
812
813 loop = asyncio.new_event_loop()
814 task = loop.create_task(test(), name='test_task')
815 try:
816 self.assertEqual(task.get_name(), 'test_task')
817 finally:
818 loop.run_until_complete(task)
819 loop.close()
820
821 def test_create_named_task_with_custom_factory(self):
822 def task_factory(loop, coro):
823 return asyncio.Task(coro, loop=loop)
824
825 async def test():
826 pass
827
828 loop = asyncio.new_event_loop()
829 loop.set_task_factory(task_factory)
830 task = loop.create_task(test(), name='test_task')
831 try:
832 self.assertEqual(task.get_name(), 'test_task')
833 finally:
834 loop.run_until_complete(task)
835 loop.close()
836
837 def test_run_forever_keyboard_interrupt(self):
838 # Python issue #22601: ensure that the temporary task created by
839 # run_forever() consumes the KeyboardInterrupt and so don't log
840 # a warning
841 async def raise_keyboard_interrupt():
842 raise KeyboardInterrupt
843
844 self.loop._process_events = mock.Mock()
845 self.loop.call_exception_handler = mock.Mock()
846
847 try:
848 self.loop.run_until_complete(raise_keyboard_interrupt())
849 except KeyboardInterrupt:
850 pass
851 self.loop.close()
852 support.gc_collect()
853
854 self.assertFalse(self.loop.call_exception_handler.called)
855
856 def test_run_until_complete_baseexception(self):
857 # Python issue #22429: run_until_complete() must not schedule a pending
858 # call to stop() if the future raised a BaseException
859 async def raise_keyboard_interrupt():
860 raise KeyboardInterrupt
861
862 self.loop._process_events = mock.Mock()
863
864 with self.assertRaises(KeyboardInterrupt):
865 self.loop.run_until_complete(raise_keyboard_interrupt())
866
867 def func():
868 self.loop.stop()
869 func.called = True
870 func.called = False
871 self.loop.call_soon(self.loop.call_soon, func)
872 self.loop.run_forever()
873 self.assertTrue(func.called)
874
875 def test_single_selecter_event_callback_after_stopping(self):
876 # Python issue #25593: A stopped event loop may cause event callbacks
877 # to run more than once.
878 event_sentinel = object()
879 callcount = 0
880 doer = None
881
882 def proc_events(event_list):
883 nonlocal doer
884 if event_sentinel in event_list:
885 doer = self.loop.call_soon(do_event)
886
887 def do_event():
888 nonlocal callcount
889 callcount += 1
890 self.loop.call_soon(clear_selector)
891
892 def clear_selector():
893 doer.cancel()
894 self.loop._selector.select.return_value = ()
895
896 self.loop._process_events = proc_events
897 self.loop._selector.select.return_value = (event_sentinel,)
898
899 for i in range(1, 3):
900 with self.subTest('Loop %d/2' % i):
901 self.loop.call_soon(self.loop.stop)
902 self.loop.run_forever()
903 self.assertEqual(callcount, 1)
904
905 def test_run_once(self):
906 # Simple test for test_utils.run_once(). It may seem strange
907 # to have a test for this (the function isn't even used!) but
908 # it's a de-factor standard API for library tests. This tests
909 # the idiom: loop.call_soon(loop.stop); loop.run_forever().
910 count = 0
911
912 def callback():
913 nonlocal count
914 count += 1
915
916 self.loop._process_events = mock.Mock()
917 self.loop.call_soon(callback)
918 test_utils.run_once(self.loop)
919 self.assertEqual(count, 1)
920
921 def test_run_forever_pre_stopped(self):
922 # Test that the old idiom for pre-stopping the loop works.
923 self.loop._process_events = mock.Mock()
924 self.loop.stop()
925 self.loop.run_forever()
926 self.loop._selector.select.assert_called_once_with(0)
927
928 async def leave_unfinalized_asyncgen(self):
929 # Create an async generator, iterate it partially, and leave it
930 # to be garbage collected.
931 # Used in async generator finalization tests.
932 # Depends on implementation details of garbage collector. Changes
933 # in gc may break this function.
934 status = {'started': False,
935 'stopped': False,
936 'finalized': False}
937
938 async def agen():
939 status['started'] = True
940 try:
941 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
942 yield item
943 finally:
944 status['finalized'] = True
945
946 ag = agen()
947 ai = ag.__aiter__()
948
949 async def iter_one():
950 try:
951 item = await ai.__anext__()
952 except StopAsyncIteration:
953 return
954 if item == 'THREE':
955 status['stopped'] = True
956 return
957 asyncio.create_task(iter_one())
958
959 asyncio.create_task(iter_one())
960 return status
961
962 def test_asyncgen_finalization_by_gc(self):
963 # Async generators should be finalized when garbage collected.
964 self.loop._process_events = mock.Mock()
965 self.loop._write_to_self = mock.Mock()
966 with support.disable_gc():
967 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
968 while not status['stopped']:
969 test_utils.run_briefly(self.loop)
970 self.assertTrue(status['started'])
971 self.assertTrue(status['stopped'])
972 self.assertFalse(status['finalized'])
973 support.gc_collect()
974 test_utils.run_briefly(self.loop)
975 self.assertTrue(status['finalized'])
976
977 def test_asyncgen_finalization_by_gc_in_other_thread(self):
978 # Python issue 34769: If garbage collector runs in another
979 # thread, async generators will not finalize in debug
980 # mode.
981 self.loop._process_events = mock.Mock()
982 self.loop._write_to_self = mock.Mock()
983 self.loop.set_debug(True)
984 with support.disable_gc():
985 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
986 while not status['stopped']:
987 test_utils.run_briefly(self.loop)
988 self.assertTrue(status['started'])
989 self.assertTrue(status['stopped'])
990 self.assertFalse(status['finalized'])
991 self.loop.run_until_complete(
992 self.loop.run_in_executor(None, support.gc_collect))
993 test_utils.run_briefly(self.loop)
994 self.assertTrue(status['finalized'])
995
996
997 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
998 done = None
999
1000 def __init__(self, create_future=False):
1001 self.state = 'INITIAL'
1002 self.nbytes = 0
1003 if create_future:
1004 self.done = asyncio.get_running_loop().create_future()
1005
1006 def _assert_state(self, *expected):
1007 if self.state not in expected:
1008 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1009
1010 def connection_made(self, transport):
1011 self.transport = transport
1012 self._assert_state('INITIAL')
1013 self.state = 'CONNECTED'
1014 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1015
1016 def data_received(self, data):
1017 self._assert_state('CONNECTED')
1018 self.nbytes += len(data)
1019
1020 def eof_received(self):
1021 self._assert_state('CONNECTED')
1022 self.state = 'EOF'
1023
1024 def connection_lost(self, exc):
1025 self._assert_state('CONNECTED', 'EOF')
1026 self.state = 'CLOSED'
1027 if self.done:
1028 self.done.set_result(None)
1029
1030
1031 class ESC[4;38;5;81mMyDatagramProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDatagramProtocol):
1032 done = None
1033
1034 def __init__(self, create_future=False, loop=None):
1035 self.state = 'INITIAL'
1036 self.nbytes = 0
1037 if create_future:
1038 self.done = loop.create_future()
1039
1040 def _assert_state(self, expected):
1041 if self.state != expected:
1042 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1043
1044 def connection_made(self, transport):
1045 self.transport = transport
1046 self._assert_state('INITIAL')
1047 self.state = 'INITIALIZED'
1048
1049 def datagram_received(self, data, addr):
1050 self._assert_state('INITIALIZED')
1051 self.nbytes += len(data)
1052
1053 def error_received(self, exc):
1054 self._assert_state('INITIALIZED')
1055
1056 def connection_lost(self, exc):
1057 self._assert_state('INITIALIZED')
1058 self.state = 'CLOSED'
1059 if self.done:
1060 self.done.set_result(None)
1061
1062
1063 class ESC[4;38;5;81mBaseEventLoopWithSelectorTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1064
1065 def setUp(self):
1066 super().setUp()
1067 self.loop = asyncio.SelectorEventLoop()
1068 self.set_event_loop(self.loop)
1069
1070 @mock.patch('socket.getnameinfo')
1071 def test_getnameinfo(self, m_gai):
1072 m_gai.side_effect = lambda *args: 42
1073 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1074 self.assertEqual(r, 42)
1075
1076 @patch_socket
1077 def test_create_connection_multiple_errors(self, m_socket):
1078
1079 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
1080 pass
1081
1082 async def getaddrinfo(*args, **kw):
1083 return [(2, 1, 6, '', ('107.6.106.82', 80)),
1084 (2, 1, 6, '', ('107.6.106.82', 80))]
1085
1086 def getaddrinfo_task(*args, **kwds):
1087 return self.loop.create_task(getaddrinfo(*args, **kwds))
1088
1089 idx = -1
1090 errors = ['err1', 'err2']
1091
1092 def _socket(*args, **kw):
1093 nonlocal idx, errors
1094 idx += 1
1095 raise OSError(errors[idx])
1096
1097 m_socket.socket = _socket
1098
1099 self.loop.getaddrinfo = getaddrinfo_task
1100
1101 coro = self.loop.create_connection(MyProto, 'example.com', 80)
1102 with self.assertRaises(OSError) as cm:
1103 self.loop.run_until_complete(coro)
1104
1105 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1106
1107 idx = -1
1108 coro = self.loop.create_connection(MyProto, 'example.com', 80, all_errors=True)
1109 with self.assertRaises(ExceptionGroup) as cm:
1110 self.loop.run_until_complete(coro)
1111
1112 self.assertIsInstance(cm.exception, ExceptionGroup)
1113 for e in cm.exception.exceptions:
1114 self.assertIsInstance(e, OSError)
1115
1116 @patch_socket
1117 def test_create_connection_timeout(self, m_socket):
1118 # Ensure that the socket is closed on timeout
1119 sock = mock.Mock()
1120 m_socket.socket.return_value = sock
1121
1122 def getaddrinfo(*args, **kw):
1123 fut = self.loop.create_future()
1124 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1125 ('127.0.0.1', 80))
1126 fut.set_result([addr])
1127 return fut
1128 self.loop.getaddrinfo = getaddrinfo
1129
1130 with mock.patch.object(self.loop, 'sock_connect',
1131 side_effect=asyncio.TimeoutError):
1132 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1133 with self.assertRaises(asyncio.TimeoutError):
1134 self.loop.run_until_complete(coro)
1135 self.assertTrue(sock.close.called)
1136
1137 def test_create_connection_host_port_sock(self):
1138 coro = self.loop.create_connection(
1139 MyProto, 'example.com', 80, sock=object())
1140 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1141
1142 def test_create_connection_wrong_sock(self):
1143 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1144 with sock:
1145 coro = self.loop.create_connection(MyProto, sock=sock)
1146 with self.assertRaisesRegex(ValueError,
1147 'A Stream Socket was expected'):
1148 self.loop.run_until_complete(coro)
1149
1150 def test_create_server_wrong_sock(self):
1151 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1152 with sock:
1153 coro = self.loop.create_server(MyProto, sock=sock)
1154 with self.assertRaisesRegex(ValueError,
1155 'A Stream Socket was expected'):
1156 self.loop.run_until_complete(coro)
1157
1158 def test_create_server_ssl_timeout_for_plain_socket(self):
1159 coro = self.loop.create_server(
1160 MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1161 with self.assertRaisesRegex(
1162 ValueError,
1163 'ssl_handshake_timeout is only meaningful with ssl'):
1164 self.loop.run_until_complete(coro)
1165
1166 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1167 'no socket.SOCK_NONBLOCK (linux only)')
1168 def test_create_server_stream_bittype(self):
1169 sock = socket.socket(
1170 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1171 with sock:
1172 coro = self.loop.create_server(lambda: None, sock=sock)
1173 srv = self.loop.run_until_complete(coro)
1174 srv.close()
1175 self.loop.run_until_complete(srv.wait_closed())
1176
1177 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1178 def test_create_server_ipv6(self):
1179 async def main():
1180 srv = await asyncio.start_server(lambda: None, '::1', 0)
1181 try:
1182 self.assertGreater(len(srv.sockets), 0)
1183 finally:
1184 srv.close()
1185 await srv.wait_closed()
1186
1187 try:
1188 self.loop.run_until_complete(main())
1189 except OSError as ex:
1190 if (hasattr(errno, 'EADDRNOTAVAIL') and
1191 ex.errno == errno.EADDRNOTAVAIL):
1192 self.skipTest('failed to bind to ::1')
1193 else:
1194 raise
1195
1196 def test_create_datagram_endpoint_wrong_sock(self):
1197 sock = socket.socket(socket.AF_INET)
1198 with sock:
1199 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1200 with self.assertRaisesRegex(ValueError,
1201 'A UDP Socket was expected'):
1202 self.loop.run_until_complete(coro)
1203
1204 def test_create_connection_no_host_port_sock(self):
1205 coro = self.loop.create_connection(MyProto)
1206 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1207
1208 def test_create_connection_no_getaddrinfo(self):
1209 async def getaddrinfo(*args, **kw):
1210 return []
1211
1212 def getaddrinfo_task(*args, **kwds):
1213 return self.loop.create_task(getaddrinfo(*args, **kwds))
1214
1215 self.loop.getaddrinfo = getaddrinfo_task
1216 coro = self.loop.create_connection(MyProto, 'example.com', 80)
1217 self.assertRaises(
1218 OSError, self.loop.run_until_complete, coro)
1219
1220 def test_create_connection_connect_err(self):
1221 async def getaddrinfo(*args, **kw):
1222 return [(2, 1, 6, '', ('107.6.106.82', 80))]
1223
1224 def getaddrinfo_task(*args, **kwds):
1225 return self.loop.create_task(getaddrinfo(*args, **kwds))
1226
1227 self.loop.getaddrinfo = getaddrinfo_task
1228 self.loop.sock_connect = mock.Mock()
1229 self.loop.sock_connect.side_effect = OSError
1230
1231 coro = self.loop.create_connection(MyProto, 'example.com', 80)
1232 self.assertRaises(
1233 OSError, self.loop.run_until_complete, coro)
1234
1235 coro = self.loop.create_connection(MyProto, 'example.com', 80, all_errors=True)
1236 with self.assertRaises(ExceptionGroup) as cm:
1237 self.loop.run_until_complete(coro)
1238
1239 self.assertIsInstance(cm.exception, ExceptionGroup)
1240 self.assertEqual(len(cm.exception.exceptions), 1)
1241 self.assertIsInstance(cm.exception.exceptions[0], OSError)
1242
1243 def test_create_connection_multiple(self):
1244 async def getaddrinfo(*args, **kw):
1245 return [(2, 1, 6, '', ('0.0.0.1', 80)),
1246 (2, 1, 6, '', ('0.0.0.2', 80))]
1247
1248 def getaddrinfo_task(*args, **kwds):
1249 return self.loop.create_task(getaddrinfo(*args, **kwds))
1250
1251 self.loop.getaddrinfo = getaddrinfo_task
1252 self.loop.sock_connect = mock.Mock()
1253 self.loop.sock_connect.side_effect = OSError
1254
1255 coro = self.loop.create_connection(
1256 MyProto, 'example.com', 80, family=socket.AF_INET)
1257 with self.assertRaises(OSError):
1258 self.loop.run_until_complete(coro)
1259
1260 coro = self.loop.create_connection(
1261 MyProto, 'example.com', 80, family=socket.AF_INET, all_errors=True)
1262 with self.assertRaises(ExceptionGroup) as cm:
1263 self.loop.run_until_complete(coro)
1264
1265 self.assertIsInstance(cm.exception, ExceptionGroup)
1266 for e in cm.exception.exceptions:
1267 self.assertIsInstance(e, OSError)
1268
1269 @patch_socket
1270 def test_create_connection_multiple_errors_local_addr(self, m_socket):
1271
1272 def bind(addr):
1273 if addr[0] == '0.0.0.1':
1274 err = OSError('Err')
1275 err.strerror = 'Err'
1276 raise err
1277
1278 m_socket.socket.return_value.bind = bind
1279
1280 async def getaddrinfo(*args, **kw):
1281 return [(2, 1, 6, '', ('0.0.0.1', 80)),
1282 (2, 1, 6, '', ('0.0.0.2', 80))]
1283
1284 def getaddrinfo_task(*args, **kwds):
1285 return self.loop.create_task(getaddrinfo(*args, **kwds))
1286
1287 self.loop.getaddrinfo = getaddrinfo_task
1288 self.loop.sock_connect = mock.Mock()
1289 self.loop.sock_connect.side_effect = OSError('Err2')
1290
1291 coro = self.loop.create_connection(
1292 MyProto, 'example.com', 80, family=socket.AF_INET,
1293 local_addr=(None, 8080))
1294 with self.assertRaises(OSError) as cm:
1295 self.loop.run_until_complete(coro)
1296
1297 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1298 self.assertTrue(m_socket.socket.return_value.close.called)
1299
1300 coro = self.loop.create_connection(
1301 MyProto, 'example.com', 80, family=socket.AF_INET,
1302 local_addr=(None, 8080), all_errors=True)
1303 with self.assertRaises(ExceptionGroup) as cm:
1304 self.loop.run_until_complete(coro)
1305
1306 self.assertIsInstance(cm.exception, ExceptionGroup)
1307 for e in cm.exception.exceptions:
1308 self.assertIsInstance(e, OSError)
1309
1310 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1311 # Test the fallback code, even if this system has inet_pton.
1312 if not allow_inet_pton:
1313 del m_socket.inet_pton
1314
1315 m_socket.getaddrinfo = socket.getaddrinfo
1316 sock = m_socket.socket.return_value
1317
1318 self.loop._add_reader = mock.Mock()
1319 self.loop._add_writer = mock.Mock()
1320
1321 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1322 t, p = self.loop.run_until_complete(coro)
1323 try:
1324 sock.connect.assert_called_with(('1.2.3.4', 80))
1325 _, kwargs = m_socket.socket.call_args
1326 self.assertEqual(kwargs['family'], m_socket.AF_INET)
1327 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1328 finally:
1329 t.close()
1330 test_utils.run_briefly(self.loop) # allow transport to close
1331
1332 if socket_helper.IPV6_ENABLED:
1333 sock.family = socket.AF_INET6
1334 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1335 t, p = self.loop.run_until_complete(coro)
1336 try:
1337 # Without inet_pton we use getaddrinfo, which transforms
1338 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
1339 # scope id.
1340 [address] = sock.connect.call_args[0]
1341 host, port = address[:2]
1342 self.assertRegex(host, r'::(0\.)*1')
1343 self.assertEqual(port, 80)
1344 _, kwargs = m_socket.socket.call_args
1345 self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1346 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1347 finally:
1348 t.close()
1349 test_utils.run_briefly(self.loop) # allow transport to close
1350
1351 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1352 @unittest.skipIf(sys.platform.startswith('aix'),
1353 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
1354 @patch_socket
1355 def test_create_connection_ipv6_scope(self, m_socket):
1356 m_socket.getaddrinfo = socket.getaddrinfo
1357 sock = m_socket.socket.return_value
1358 sock.family = socket.AF_INET6
1359
1360 self.loop._add_reader = mock.Mock()
1361 self.loop._add_writer = mock.Mock()
1362
1363 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
1364 t, p = self.loop.run_until_complete(coro)
1365 try:
1366 sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
1367 _, kwargs = m_socket.socket.call_args
1368 self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1369 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1370 finally:
1371 t.close()
1372 test_utils.run_briefly(self.loop) # allow transport to close
1373
1374 @patch_socket
1375 def test_create_connection_ip_addr(self, m_socket):
1376 self._test_create_connection_ip_addr(m_socket, True)
1377
1378 @patch_socket
1379 def test_create_connection_no_inet_pton(self, m_socket):
1380 self._test_create_connection_ip_addr(m_socket, False)
1381
1382 @patch_socket
1383 def test_create_connection_service_name(self, m_socket):
1384 m_socket.getaddrinfo = socket.getaddrinfo
1385 sock = m_socket.socket.return_value
1386
1387 self.loop._add_reader = mock.Mock()
1388 self.loop._add_writer = mock.Mock()
1389
1390 for service, port in ('http', 80), (b'http', 80):
1391 coro = self.loop.create_connection(asyncio.Protocol,
1392 '127.0.0.1', service)
1393
1394 t, p = self.loop.run_until_complete(coro)
1395 try:
1396 sock.connect.assert_called_with(('127.0.0.1', port))
1397 _, kwargs = m_socket.socket.call_args
1398 self.assertEqual(kwargs['family'], m_socket.AF_INET)
1399 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1400 finally:
1401 t.close()
1402 test_utils.run_briefly(self.loop) # allow transport to close
1403
1404 for service in 'nonsense', b'nonsense':
1405 coro = self.loop.create_connection(asyncio.Protocol,
1406 '127.0.0.1', service)
1407
1408 with self.assertRaises(OSError):
1409 self.loop.run_until_complete(coro)
1410
1411 def test_create_connection_no_local_addr(self):
1412 async def getaddrinfo(host, *args, **kw):
1413 if host == 'example.com':
1414 return [(2, 1, 6, '', ('107.6.106.82', 80)),
1415 (2, 1, 6, '', ('107.6.106.82', 80))]
1416 else:
1417 return []
1418
1419 def getaddrinfo_task(*args, **kwds):
1420 return self.loop.create_task(getaddrinfo(*args, **kwds))
1421 self.loop.getaddrinfo = getaddrinfo_task
1422
1423 coro = self.loop.create_connection(
1424 MyProto, 'example.com', 80, family=socket.AF_INET,
1425 local_addr=(None, 8080))
1426 self.assertRaises(
1427 OSError, self.loop.run_until_complete, coro)
1428
1429 @patch_socket
1430 def test_create_connection_bluetooth(self, m_socket):
1431 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1432 # we can't recognize an address is resolved, e.g. a Bluetooth address.
1433 addr = ('00:01:02:03:04:05', 1)
1434
1435 def getaddrinfo(host, port, *args, **kw):
1436 self.assertEqual((host, port), addr)
1437 return [(999, 1, 999, '', (addr, 1))]
1438
1439 m_socket.getaddrinfo = getaddrinfo
1440 sock = m_socket.socket()
1441 coro = self.loop.sock_connect(sock, addr)
1442 self.loop.run_until_complete(coro)
1443
1444 def test_create_connection_ssl_server_hostname_default(self):
1445 self.loop.getaddrinfo = mock.Mock()
1446
1447 def mock_getaddrinfo(*args, **kwds):
1448 f = self.loop.create_future()
1449 f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1450 socket.SOL_TCP, '', ('1.2.3.4', 80))])
1451 return f
1452
1453 self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1454 self.loop.sock_connect = mock.Mock()
1455 self.loop.sock_connect.return_value = self.loop.create_future()
1456 self.loop.sock_connect.return_value.set_result(None)
1457 self.loop._make_ssl_transport = mock.Mock()
1458
1459 class ESC[4;38;5;81m_SelectorTransportMock:
1460 _sock = None
1461
1462 def get_extra_info(self, key):
1463 return mock.Mock()
1464
1465 def close(self):
1466 self._sock.close()
1467
1468 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1469 **kwds):
1470 waiter.set_result(None)
1471 transport = _SelectorTransportMock()
1472 transport._sock = sock
1473 return transport
1474
1475 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1476 ANY = mock.ANY
1477 handshake_timeout = object()
1478 shutdown_timeout = object()
1479 # First try the default server_hostname.
1480 self.loop._make_ssl_transport.reset_mock()
1481 coro = self.loop.create_connection(
1482 MyProto, 'python.org', 80, ssl=True,
1483 ssl_handshake_timeout=handshake_timeout,
1484 ssl_shutdown_timeout=shutdown_timeout)
1485 transport, _ = self.loop.run_until_complete(coro)
1486 transport.close()
1487 self.loop._make_ssl_transport.assert_called_with(
1488 ANY, ANY, ANY, ANY,
1489 server_side=False,
1490 server_hostname='python.org',
1491 ssl_handshake_timeout=handshake_timeout,
1492 ssl_shutdown_timeout=shutdown_timeout)
1493 # Next try an explicit server_hostname.
1494 self.loop._make_ssl_transport.reset_mock()
1495 coro = self.loop.create_connection(
1496 MyProto, 'python.org', 80, ssl=True,
1497 server_hostname='perl.com',
1498 ssl_handshake_timeout=handshake_timeout,
1499 ssl_shutdown_timeout=shutdown_timeout)
1500 transport, _ = self.loop.run_until_complete(coro)
1501 transport.close()
1502 self.loop._make_ssl_transport.assert_called_with(
1503 ANY, ANY, ANY, ANY,
1504 server_side=False,
1505 server_hostname='perl.com',
1506 ssl_handshake_timeout=handshake_timeout,
1507 ssl_shutdown_timeout=shutdown_timeout)
1508 # Finally try an explicit empty server_hostname.
1509 self.loop._make_ssl_transport.reset_mock()
1510 coro = self.loop.create_connection(
1511 MyProto, 'python.org', 80, ssl=True,
1512 server_hostname='',
1513 ssl_handshake_timeout=handshake_timeout,
1514 ssl_shutdown_timeout=shutdown_timeout)
1515 transport, _ = self.loop.run_until_complete(coro)
1516 transport.close()
1517 self.loop._make_ssl_transport.assert_called_with(
1518 ANY, ANY, ANY, ANY,
1519 server_side=False,
1520 server_hostname='',
1521 ssl_handshake_timeout=handshake_timeout,
1522 ssl_shutdown_timeout=shutdown_timeout)
1523
1524 def test_create_connection_no_ssl_server_hostname_errors(self):
1525 # When not using ssl, server_hostname must be None.
1526 coro = self.loop.create_connection(MyProto, 'python.org', 80,
1527 server_hostname='')
1528 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1529 coro = self.loop.create_connection(MyProto, 'python.org', 80,
1530 server_hostname='python.org')
1531 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1532
1533 def test_create_connection_ssl_server_hostname_errors(self):
1534 # When using ssl, server_hostname may be None if host is non-empty.
1535 coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1536 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1537 coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1538 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1539 sock = socket.socket()
1540 coro = self.loop.create_connection(MyProto, None, None,
1541 ssl=True, sock=sock)
1542 self.addCleanup(sock.close)
1543 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1544
1545 def test_create_connection_ssl_timeout_for_plain_socket(self):
1546 coro = self.loop.create_connection(
1547 MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1548 with self.assertRaisesRegex(
1549 ValueError,
1550 'ssl_handshake_timeout is only meaningful with ssl'):
1551 self.loop.run_until_complete(coro)
1552
1553 def test_create_server_empty_host(self):
1554 # if host is empty string use None instead
1555 host = object()
1556
1557 async def getaddrinfo(*args, **kw):
1558 nonlocal host
1559 host = args[0]
1560 return []
1561
1562 def getaddrinfo_task(*args, **kwds):
1563 return self.loop.create_task(getaddrinfo(*args, **kwds))
1564
1565 self.loop.getaddrinfo = getaddrinfo_task
1566 fut = self.loop.create_server(MyProto, '', 0)
1567 self.assertRaises(OSError, self.loop.run_until_complete, fut)
1568 self.assertIsNone(host)
1569
1570 def test_create_server_host_port_sock(self):
1571 fut = self.loop.create_server(
1572 MyProto, '0.0.0.0', 0, sock=object())
1573 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1574
1575 def test_create_server_no_host_port_sock(self):
1576 fut = self.loop.create_server(MyProto)
1577 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1578
1579 def test_create_server_no_getaddrinfo(self):
1580 getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1581 getaddrinfo.return_value = self.loop.create_future()
1582 getaddrinfo.return_value.set_result(None)
1583
1584 f = self.loop.create_server(MyProto, 'python.org', 0)
1585 self.assertRaises(OSError, self.loop.run_until_complete, f)
1586
1587 @patch_socket
1588 def test_create_server_nosoreuseport(self, m_socket):
1589 m_socket.getaddrinfo = socket.getaddrinfo
1590 del m_socket.SO_REUSEPORT
1591 m_socket.socket.return_value = mock.Mock()
1592
1593 f = self.loop.create_server(
1594 MyProto, '0.0.0.0', 0, reuse_port=True)
1595
1596 self.assertRaises(ValueError, self.loop.run_until_complete, f)
1597
1598 @patch_socket
1599 def test_create_server_soreuseport_only_defined(self, m_socket):
1600 m_socket.getaddrinfo = socket.getaddrinfo
1601 m_socket.socket.return_value = mock.Mock()
1602 m_socket.SO_REUSEPORT = -1
1603
1604 f = self.loop.create_server(
1605 MyProto, '0.0.0.0', 0, reuse_port=True)
1606
1607 self.assertRaises(ValueError, self.loop.run_until_complete, f)
1608
1609 @patch_socket
1610 def test_create_server_cant_bind(self, m_socket):
1611
1612 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
1613 strerror = 'error'
1614
1615 m_socket.getaddrinfo.return_value = [
1616 (2, 1, 6, '', ('127.0.0.1', 10100))]
1617 m_sock = m_socket.socket.return_value = mock.Mock()
1618 m_sock.bind.side_effect = Err
1619
1620 fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1621 self.assertRaises(OSError, self.loop.run_until_complete, fut)
1622 self.assertTrue(m_sock.close.called)
1623
1624 @patch_socket
1625 def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1626 m_socket.getaddrinfo.return_value = []
1627
1628 coro = self.loop.create_datagram_endpoint(
1629 MyDatagramProto, local_addr=('localhost', 0))
1630 self.assertRaises(
1631 OSError, self.loop.run_until_complete, coro)
1632
1633 def test_create_datagram_endpoint_addr_error(self):
1634 coro = self.loop.create_datagram_endpoint(
1635 MyDatagramProto, local_addr='localhost')
1636 self.assertRaises(
1637 TypeError, self.loop.run_until_complete, coro)
1638 coro = self.loop.create_datagram_endpoint(
1639 MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1640 self.assertRaises(
1641 TypeError, self.loop.run_until_complete, coro)
1642
1643 def test_create_datagram_endpoint_connect_err(self):
1644 self.loop.sock_connect = mock.Mock()
1645 self.loop.sock_connect.side_effect = OSError
1646
1647 coro = self.loop.create_datagram_endpoint(
1648 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1649 self.assertRaises(
1650 OSError, self.loop.run_until_complete, coro)
1651
1652 def test_create_datagram_endpoint_allow_broadcast(self):
1653 protocol = MyDatagramProto(create_future=True, loop=self.loop)
1654 self.loop.sock_connect = sock_connect = mock.Mock()
1655 sock_connect.return_value = []
1656
1657 coro = self.loop.create_datagram_endpoint(
1658 lambda: protocol,
1659 remote_addr=('127.0.0.1', 0),
1660 allow_broadcast=True)
1661
1662 transport, _ = self.loop.run_until_complete(coro)
1663 self.assertFalse(sock_connect.called)
1664
1665 transport.close()
1666 self.loop.run_until_complete(protocol.done)
1667 self.assertEqual('CLOSED', protocol.state)
1668
1669 @patch_socket
1670 def test_create_datagram_endpoint_socket_err(self, m_socket):
1671 m_socket.getaddrinfo = socket.getaddrinfo
1672 m_socket.socket.side_effect = OSError
1673
1674 coro = self.loop.create_datagram_endpoint(
1675 asyncio.DatagramProtocol, family=socket.AF_INET)
1676 self.assertRaises(
1677 OSError, self.loop.run_until_complete, coro)
1678
1679 coro = self.loop.create_datagram_endpoint(
1680 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1681 self.assertRaises(
1682 OSError, self.loop.run_until_complete, coro)
1683
1684 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1685 def test_create_datagram_endpoint_no_matching_family(self):
1686 coro = self.loop.create_datagram_endpoint(
1687 asyncio.DatagramProtocol,
1688 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1689 self.assertRaises(
1690 ValueError, self.loop.run_until_complete, coro)
1691
1692 @patch_socket
1693 def test_create_datagram_endpoint_setblk_err(self, m_socket):
1694 m_socket.socket.return_value.setblocking.side_effect = OSError
1695
1696 coro = self.loop.create_datagram_endpoint(
1697 asyncio.DatagramProtocol, family=socket.AF_INET)
1698 self.assertRaises(
1699 OSError, self.loop.run_until_complete, coro)
1700 self.assertTrue(
1701 m_socket.socket.return_value.close.called)
1702
1703 def test_create_datagram_endpoint_noaddr_nofamily(self):
1704 coro = self.loop.create_datagram_endpoint(
1705 asyncio.DatagramProtocol)
1706 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1707
1708 @patch_socket
1709 def test_create_datagram_endpoint_cant_bind(self, m_socket):
1710 class ESC[4;38;5;81mErr(ESC[4;38;5;149mOSError):
1711 pass
1712
1713 m_socket.getaddrinfo = socket.getaddrinfo
1714 m_sock = m_socket.socket.return_value = mock.Mock()
1715 m_sock.bind.side_effect = Err
1716
1717 fut = self.loop.create_datagram_endpoint(
1718 MyDatagramProto,
1719 local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1720 self.assertRaises(Err, self.loop.run_until_complete, fut)
1721 self.assertTrue(m_sock.close.called)
1722
1723 def test_create_datagram_endpoint_sock(self):
1724 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1725 sock.bind(('127.0.0.1', 0))
1726 fut = self.loop.create_datagram_endpoint(
1727 lambda: MyDatagramProto(create_future=True, loop=self.loop),
1728 sock=sock)
1729 transport, protocol = self.loop.run_until_complete(fut)
1730 transport.close()
1731 self.loop.run_until_complete(protocol.done)
1732 self.assertEqual('CLOSED', protocol.state)
1733
1734 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1735 def test_create_datagram_endpoint_sock_unix(self):
1736 fut = self.loop.create_datagram_endpoint(
1737 lambda: MyDatagramProto(create_future=True, loop=self.loop),
1738 family=socket.AF_UNIX)
1739 transport, protocol = self.loop.run_until_complete(fut)
1740 self.assertEqual(transport._sock.family, socket.AF_UNIX)
1741 transport.close()
1742 self.loop.run_until_complete(protocol.done)
1743 self.assertEqual('CLOSED', protocol.state)
1744
1745 @socket_helper.skip_unless_bind_unix_socket
1746 def test_create_datagram_endpoint_existing_sock_unix(self):
1747 with test_utils.unix_socket_path() as path:
1748 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
1749 sock.bind(path)
1750 sock.close()
1751
1752 coro = self.loop.create_datagram_endpoint(
1753 lambda: MyDatagramProto(create_future=True, loop=self.loop),
1754 path, family=socket.AF_UNIX)
1755 transport, protocol = self.loop.run_until_complete(coro)
1756 transport.close()
1757 self.loop.run_until_complete(protocol.done)
1758
1759 def test_create_datagram_endpoint_sock_sockopts(self):
1760 class ESC[4;38;5;81mFakeSock:
1761 type = socket.SOCK_DGRAM
1762
1763 fut = self.loop.create_datagram_endpoint(
1764 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1765 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1766
1767 fut = self.loop.create_datagram_endpoint(
1768 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1769 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1770
1771 fut = self.loop.create_datagram_endpoint(
1772 MyDatagramProto, family=1, sock=FakeSock())
1773 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1774
1775 fut = self.loop.create_datagram_endpoint(
1776 MyDatagramProto, proto=1, sock=FakeSock())
1777 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1778
1779 fut = self.loop.create_datagram_endpoint(
1780 MyDatagramProto, flags=1, sock=FakeSock())
1781 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1782
1783 fut = self.loop.create_datagram_endpoint(
1784 MyDatagramProto, reuse_port=True, sock=FakeSock())
1785 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1786
1787 fut = self.loop.create_datagram_endpoint(
1788 MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1789 self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1790
1791 @unittest.skipIf(sys.platform == 'vxworks',
1792 "SO_BROADCAST is enabled by default on VxWorks")
1793 def test_create_datagram_endpoint_sockopts(self):
1794 # Socket options should not be applied unless asked for.
1795 # SO_REUSEPORT is not available on all platforms.
1796
1797 coro = self.loop.create_datagram_endpoint(
1798 lambda: MyDatagramProto(create_future=True, loop=self.loop),
1799 local_addr=('127.0.0.1', 0))
1800 transport, protocol = self.loop.run_until_complete(coro)
1801 sock = transport.get_extra_info('socket')
1802
1803 reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1804
1805 if reuseport_supported:
1806 self.assertFalse(
1807 sock.getsockopt(
1808 socket.SOL_SOCKET, socket.SO_REUSEPORT))
1809 self.assertFalse(
1810 sock.getsockopt(
1811 socket.SOL_SOCKET, socket.SO_BROADCAST))
1812
1813 transport.close()
1814 self.loop.run_until_complete(protocol.done)
1815 self.assertEqual('CLOSED', protocol.state)
1816
1817 coro = self.loop.create_datagram_endpoint(
1818 lambda: MyDatagramProto(create_future=True, loop=self.loop),
1819 local_addr=('127.0.0.1', 0),
1820 reuse_port=reuseport_supported,
1821 allow_broadcast=True)
1822 transport, protocol = self.loop.run_until_complete(coro)
1823 sock = transport.get_extra_info('socket')
1824
1825 self.assertFalse(
1826 sock.getsockopt(
1827 socket.SOL_SOCKET, socket.SO_REUSEADDR))
1828 if reuseport_supported:
1829 self.assertTrue(
1830 sock.getsockopt(
1831 socket.SOL_SOCKET, socket.SO_REUSEPORT))
1832 self.assertTrue(
1833 sock.getsockopt(
1834 socket.SOL_SOCKET, socket.SO_BROADCAST))
1835
1836 transport.close()
1837 self.loop.run_until_complete(protocol.done)
1838 self.assertEqual('CLOSED', protocol.state)
1839
1840 @patch_socket
1841 def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1842 del m_socket.SO_REUSEPORT
1843 m_socket.socket.return_value = mock.Mock()
1844
1845 coro = self.loop.create_datagram_endpoint(
1846 lambda: MyDatagramProto(loop=self.loop),
1847 local_addr=('127.0.0.1', 0),
1848 reuse_port=True)
1849
1850 self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1851
1852 @patch_socket
1853 def test_create_datagram_endpoint_ip_addr(self, m_socket):
1854 def getaddrinfo(*args, **kw):
1855 self.fail('should not have called getaddrinfo')
1856
1857 m_socket.getaddrinfo = getaddrinfo
1858 m_socket.socket.return_value.bind = bind = mock.Mock()
1859 self.loop._add_reader = mock.Mock()
1860
1861 reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1862 coro = self.loop.create_datagram_endpoint(
1863 lambda: MyDatagramProto(loop=self.loop),
1864 local_addr=('1.2.3.4', 0),
1865 reuse_port=reuseport_supported)
1866
1867 t, p = self.loop.run_until_complete(coro)
1868 try:
1869 bind.assert_called_with(('1.2.3.4', 0))
1870 m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1871 proto=m_socket.IPPROTO_UDP,
1872 type=m_socket.SOCK_DGRAM)
1873 finally:
1874 t.close()
1875 test_utils.run_briefly(self.loop) # allow transport to close
1876
1877 def test_accept_connection_retry(self):
1878 sock = mock.Mock()
1879 sock.accept.side_effect = BlockingIOError()
1880
1881 self.loop._accept_connection(MyProto, sock)
1882 self.assertFalse(sock.close.called)
1883
1884 @mock.patch('asyncio.base_events.logger')
1885 def test_accept_connection_exception(self, m_log):
1886 sock = mock.Mock()
1887 sock.fileno.return_value = 10
1888 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1889 self.loop._remove_reader = mock.Mock()
1890 self.loop.call_later = mock.Mock()
1891
1892 self.loop._accept_connection(MyProto, sock)
1893 self.assertTrue(m_log.error.called)
1894 self.assertFalse(sock.close.called)
1895 self.loop._remove_reader.assert_called_with(10)
1896 self.loop.call_later.assert_called_with(
1897 constants.ACCEPT_RETRY_DELAY,
1898 # self.loop._start_serving
1899 mock.ANY,
1900 MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY)
1901
1902 def test_call_coroutine(self):
1903 async def simple_coroutine():
1904 pass
1905
1906 self.loop.set_debug(True)
1907 coro_func = simple_coroutine
1908 coro_obj = coro_func()
1909 self.addCleanup(coro_obj.close)
1910 for func in (coro_func, coro_obj):
1911 with self.assertRaises(TypeError):
1912 self.loop.call_soon(func)
1913 with self.assertRaises(TypeError):
1914 self.loop.call_soon_threadsafe(func)
1915 with self.assertRaises(TypeError):
1916 self.loop.call_later(60, func)
1917 with self.assertRaises(TypeError):
1918 self.loop.call_at(self.loop.time() + 60, func)
1919 with self.assertRaises(TypeError):
1920 self.loop.run_until_complete(
1921 self.loop.run_in_executor(None, func))
1922
1923 @mock.patch('asyncio.base_events.logger')
1924 def test_log_slow_callbacks(self, m_logger):
1925 def stop_loop_cb(loop):
1926 loop.stop()
1927
1928 async def stop_loop_coro(loop):
1929 loop.stop()
1930
1931 asyncio.set_event_loop(self.loop)
1932 self.loop.set_debug(True)
1933 self.loop.slow_callback_duration = 0.0
1934
1935 # slow callback
1936 self.loop.call_soon(stop_loop_cb, self.loop)
1937 self.loop.run_forever()
1938 fmt, *args = m_logger.warning.call_args[0]
1939 self.assertRegex(fmt % tuple(args),
1940 "^Executing <Handle.*stop_loop_cb.*> "
1941 "took .* seconds$")
1942
1943 # slow task
1944 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1945 self.loop.run_forever()
1946 fmt, *args = m_logger.warning.call_args[0]
1947 self.assertRegex(fmt % tuple(args),
1948 "^Executing <Task.*stop_loop_coro.*> "
1949 "took .* seconds$")
1950
1951
1952 class ESC[4;38;5;81mRunningLoopTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1953
1954 def test_running_loop_within_a_loop(self):
1955 async def runner(loop):
1956 loop.run_forever()
1957
1958 loop = asyncio.new_event_loop()
1959 outer_loop = asyncio.new_event_loop()
1960 try:
1961 with self.assertRaisesRegex(RuntimeError,
1962 'while another loop is running'):
1963 outer_loop.run_until_complete(runner(loop))
1964 finally:
1965 loop.close()
1966 outer_loop.close()
1967
1968
1969 class ESC[4;38;5;81mBaseLoopSockSendfileTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1970
1971 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
1972
1973 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
1974
1975 def __init__(self, loop):
1976 self.started = False
1977 self.closed = False
1978 self.data = bytearray()
1979 self.fut = loop.create_future()
1980 self.transport = None
1981
1982 def connection_made(self, transport):
1983 self.started = True
1984 self.transport = transport
1985
1986 def data_received(self, data):
1987 self.data.extend(data)
1988
1989 def connection_lost(self, exc):
1990 self.closed = True
1991 self.fut.set_result(None)
1992 self.transport = None
1993
1994 async def wait_closed(self):
1995 await self.fut
1996
1997 @classmethod
1998 def setUpClass(cls):
1999 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
2000 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
2001 with open(os_helper.TESTFN, 'wb') as fp:
2002 fp.write(cls.DATA)
2003 super().setUpClass()
2004
2005 @classmethod
2006 def tearDownClass(cls):
2007 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
2008 os_helper.unlink(os_helper.TESTFN)
2009 super().tearDownClass()
2010
2011 def setUp(self):
2012 from asyncio.selector_events import BaseSelectorEventLoop
2013 # BaseSelectorEventLoop() has no native implementation
2014 self.loop = BaseSelectorEventLoop()
2015 self.set_event_loop(self.loop)
2016 self.file = open(os_helper.TESTFN, 'rb')
2017 self.addCleanup(self.file.close)
2018 super().setUp()
2019
2020 def make_socket(self, blocking=False):
2021 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2022 sock.setblocking(blocking)
2023 self.addCleanup(sock.close)
2024 return sock
2025
2026 def run_loop(self, coro):
2027 return self.loop.run_until_complete(coro)
2028
2029 def prepare(self):
2030 sock = self.make_socket()
2031 proto = self.MyProto(self.loop)
2032 server = self.run_loop(self.loop.create_server(
2033 lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
2034 addr = server.sockets[0].getsockname()
2035
2036 for _ in range(10):
2037 try:
2038 self.run_loop(self.loop.sock_connect(sock, addr))
2039 except OSError:
2040 self.run_loop(asyncio.sleep(0.5))
2041 continue
2042 else:
2043 break
2044 else:
2045 # One last try, so we get the exception
2046 self.run_loop(self.loop.sock_connect(sock, addr))
2047
2048 def cleanup():
2049 server.close()
2050 sock.close()
2051 if proto.transport is not None:
2052 proto.transport.close()
2053 self.run_loop(proto.wait_closed())
2054 self.run_loop(server.wait_closed())
2055
2056 self.addCleanup(cleanup)
2057
2058 return sock, proto
2059
2060 def test__sock_sendfile_native_failure(self):
2061 sock, proto = self.prepare()
2062
2063 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2064 "sendfile is not available"):
2065 self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
2066 0, None))
2067
2068 self.assertEqual(proto.data, b'')
2069 self.assertEqual(self.file.tell(), 0)
2070
2071 def test_sock_sendfile_no_fallback(self):
2072 sock, proto = self.prepare()
2073
2074 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2075 "sendfile is not available"):
2076 self.run_loop(self.loop.sock_sendfile(sock, self.file,
2077 fallback=False))
2078
2079 self.assertEqual(self.file.tell(), 0)
2080 self.assertEqual(proto.data, b'')
2081
2082 def test_sock_sendfile_fallback(self):
2083 sock, proto = self.prepare()
2084
2085 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2086 sock.close()
2087 self.run_loop(proto.wait_closed())
2088
2089 self.assertEqual(ret, len(self.DATA))
2090 self.assertEqual(self.file.tell(), len(self.DATA))
2091 self.assertEqual(proto.data, self.DATA)
2092
2093 def test_sock_sendfile_fallback_offset_and_count(self):
2094 sock, proto = self.prepare()
2095
2096 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2097 1000, 2000))
2098 sock.close()
2099 self.run_loop(proto.wait_closed())
2100
2101 self.assertEqual(ret, 2000)
2102 self.assertEqual(self.file.tell(), 3000)
2103 self.assertEqual(proto.data, self.DATA[1000:3000])
2104
2105 def test_blocking_socket(self):
2106 self.loop.set_debug(True)
2107 sock = self.make_socket(blocking=True)
2108 with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2109 self.run_loop(self.loop.sock_sendfile(sock, self.file))
2110
2111 def test_nonbinary_file(self):
2112 sock = self.make_socket()
2113 with open(os_helper.TESTFN, encoding="utf-8") as f:
2114 with self.assertRaisesRegex(ValueError, "binary mode"):
2115 self.run_loop(self.loop.sock_sendfile(sock, f))
2116
2117 def test_nonstream_socket(self):
2118 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2119 sock.setblocking(False)
2120 self.addCleanup(sock.close)
2121 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2122 self.run_loop(self.loop.sock_sendfile(sock, self.file))
2123
2124 def test_notint_count(self):
2125 sock = self.make_socket()
2126 with self.assertRaisesRegex(TypeError,
2127 "count must be a positive integer"):
2128 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2129
2130 def test_negative_count(self):
2131 sock = self.make_socket()
2132 with self.assertRaisesRegex(ValueError,
2133 "count must be a positive integer"):
2134 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2135
2136 def test_notint_offset(self):
2137 sock = self.make_socket()
2138 with self.assertRaisesRegex(TypeError,
2139 "offset must be a non-negative integer"):
2140 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2141
2142 def test_negative_offset(self):
2143 sock = self.make_socket()
2144 with self.assertRaisesRegex(ValueError,
2145 "offset must be a non-negative integer"):
2146 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2147
2148
2149 class ESC[4;38;5;81mTestSelectorUtils(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2150 def check_set_nodelay(self, sock):
2151 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2152 self.assertFalse(opt)
2153
2154 base_events._set_nodelay(sock)
2155
2156 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2157 self.assertTrue(opt)
2158
2159 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2160 'need socket.TCP_NODELAY')
2161 def test_set_nodelay(self):
2162 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2163 proto=socket.IPPROTO_TCP)
2164 with sock:
2165 self.check_set_nodelay(sock)
2166
2167 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2168 proto=socket.IPPROTO_TCP)
2169 with sock:
2170 sock.setblocking(False)
2171 self.check_set_nodelay(sock)
2172
2173
2174
2175 if __name__ == '__main__':
2176 unittest.main()