python (3.12.0)
1 """Tests for events.py."""
2
3 import collections.abc
4 import concurrent.futures
5 import functools
6 import io
7 import multiprocessing
8 import os
9 import platform
10 import re
11 import signal
12 import socket
13 try:
14 import ssl
15 except ImportError:
16 ssl = None
17 import subprocess
18 import sys
19 import threading
20 import time
21 import types
22 import errno
23 import unittest
24 from unittest import mock
25 import weakref
26 import warnings
27 if sys.platform not in ('win32', 'vxworks'):
28 import tty
29
30 import asyncio
31 from asyncio import coroutines
32 from asyncio import events
33 from asyncio import selector_events
34 from multiprocessing.util import _cleanup_tests as multiprocessing_cleanup_tests
35 from test.test_asyncio import utils as test_utils
36 from test import support
37 from test.support import socket_helper
38 from test.support import threading_helper
39 from test.support import ALWAYS_EQ, LARGEST, SMALLEST
40
41
42 def tearDownModule():
43 asyncio.set_event_loop_policy(None)
44
45
46 def broken_unix_getsockname():
47 """Return True if the platform is Mac OS 10.4 or older."""
48 if sys.platform.startswith("aix"):
49 return True
50 elif sys.platform != 'darwin':
51 return False
52 version = platform.mac_ver()[0]
53 version = tuple(map(int, version.split('.')))
54 return version < (10, 5)
55
56
57 def _test_get_event_loop_new_process__sub_proc():
58 async def doit():
59 return 'hello'
60
61 loop = asyncio.new_event_loop()
62 asyncio.set_event_loop(loop)
63 return loop.run_until_complete(doit())
64
65
66 class ESC[4;38;5;81mCoroLike:
67 def send(self, v):
68 pass
69
70 def throw(self, *exc):
71 pass
72
73 def close(self):
74 pass
75
76 def __await__(self):
77 pass
78
79
80 class ESC[4;38;5;81mMyBaseProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
81 connected = None
82 done = None
83
84 def __init__(self, loop=None):
85 self.transport = None
86 self.state = 'INITIAL'
87 self.nbytes = 0
88 if loop is not None:
89 self.connected = loop.create_future()
90 self.done = loop.create_future()
91
92 def _assert_state(self, *expected):
93 if self.state not in expected:
94 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
95
96 def connection_made(self, transport):
97 self.transport = transport
98 self._assert_state('INITIAL')
99 self.state = 'CONNECTED'
100 if self.connected:
101 self.connected.set_result(None)
102
103 def data_received(self, data):
104 self._assert_state('CONNECTED')
105 self.nbytes += len(data)
106
107 def eof_received(self):
108 self._assert_state('CONNECTED')
109 self.state = 'EOF'
110
111 def connection_lost(self, exc):
112 self._assert_state('CONNECTED', 'EOF')
113 self.state = 'CLOSED'
114 if self.done:
115 self.done.set_result(None)
116
117
118 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149mMyBaseProto):
119 def connection_made(self, transport):
120 super().connection_made(transport)
121 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
122
123
124 class ESC[4;38;5;81mMyDatagramProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDatagramProtocol):
125 done = None
126
127 def __init__(self, loop=None):
128 self.state = 'INITIAL'
129 self.nbytes = 0
130 if loop is not None:
131 self.done = loop.create_future()
132
133 def _assert_state(self, expected):
134 if self.state != expected:
135 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
136
137 def connection_made(self, transport):
138 self.transport = transport
139 self._assert_state('INITIAL')
140 self.state = 'INITIALIZED'
141
142 def datagram_received(self, data, addr):
143 self._assert_state('INITIALIZED')
144 self.nbytes += len(data)
145
146 def error_received(self, exc):
147 self._assert_state('INITIALIZED')
148
149 def connection_lost(self, exc):
150 self._assert_state('INITIALIZED')
151 self.state = 'CLOSED'
152 if self.done:
153 self.done.set_result(None)
154
155
156 class ESC[4;38;5;81mMyReadPipeProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mProtocol):
157 done = None
158
159 def __init__(self, loop=None):
160 self.state = ['INITIAL']
161 self.nbytes = 0
162 self.transport = None
163 if loop is not None:
164 self.done = loop.create_future()
165
166 def _assert_state(self, expected):
167 if self.state != expected:
168 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
169
170 def connection_made(self, transport):
171 self.transport = transport
172 self._assert_state(['INITIAL'])
173 self.state.append('CONNECTED')
174
175 def data_received(self, data):
176 self._assert_state(['INITIAL', 'CONNECTED'])
177 self.nbytes += len(data)
178
179 def eof_received(self):
180 self._assert_state(['INITIAL', 'CONNECTED'])
181 self.state.append('EOF')
182
183 def connection_lost(self, exc):
184 if 'EOF' not in self.state:
185 self.state.append('EOF') # It is okay if EOF is missed.
186 self._assert_state(['INITIAL', 'CONNECTED', 'EOF'])
187 self.state.append('CLOSED')
188 if self.done:
189 self.done.set_result(None)
190
191
192 class ESC[4;38;5;81mMyWritePipeProto(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mBaseProtocol):
193 done = None
194
195 def __init__(self, loop=None):
196 self.state = 'INITIAL'
197 self.transport = None
198 if loop is not None:
199 self.done = loop.create_future()
200
201 def _assert_state(self, expected):
202 if self.state != expected:
203 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
204
205 def connection_made(self, transport):
206 self.transport = transport
207 self._assert_state('INITIAL')
208 self.state = 'CONNECTED'
209
210 def connection_lost(self, exc):
211 self._assert_state('CONNECTED')
212 self.state = 'CLOSED'
213 if self.done:
214 self.done.set_result(None)
215
216
217 class ESC[4;38;5;81mMySubprocessProtocol(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mSubprocessProtocol):
218
219 def __init__(self, loop):
220 self.state = 'INITIAL'
221 self.transport = None
222 self.connected = loop.create_future()
223 self.completed = loop.create_future()
224 self.disconnects = {fd: loop.create_future() for fd in range(3)}
225 self.data = {1: b'', 2: b''}
226 self.returncode = None
227 self.got_data = {1: asyncio.Event(),
228 2: asyncio.Event()}
229
230 def _assert_state(self, expected):
231 if self.state != expected:
232 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
233
234 def connection_made(self, transport):
235 self.transport = transport
236 self._assert_state('INITIAL')
237 self.state = 'CONNECTED'
238 self.connected.set_result(None)
239
240 def connection_lost(self, exc):
241 self._assert_state('CONNECTED')
242 self.state = 'CLOSED'
243 self.completed.set_result(None)
244
245 def pipe_data_received(self, fd, data):
246 self._assert_state('CONNECTED')
247 self.data[fd] += data
248 self.got_data[fd].set()
249
250 def pipe_connection_lost(self, fd, exc):
251 self._assert_state('CONNECTED')
252 if exc:
253 self.disconnects[fd].set_exception(exc)
254 else:
255 self.disconnects[fd].set_result(exc)
256
257 def process_exited(self):
258 self._assert_state('CONNECTED')
259 self.returncode = self.transport.get_returncode()
260
261
262 class ESC[4;38;5;81mEventLoopTestsMixin:
263
264 def setUp(self):
265 super().setUp()
266 self.loop = self.create_event_loop()
267 self.set_event_loop(self.loop)
268
269 def tearDown(self):
270 # just in case if we have transport close callbacks
271 if not self.loop.is_closed():
272 test_utils.run_briefly(self.loop)
273
274 self.doCleanups()
275 support.gc_collect()
276 super().tearDown()
277
278 def test_run_until_complete_nesting(self):
279 async def coro1():
280 await asyncio.sleep(0)
281
282 async def coro2():
283 self.assertTrue(self.loop.is_running())
284 self.loop.run_until_complete(coro1())
285
286 with self.assertWarnsRegex(
287 RuntimeWarning,
288 r"coroutine \S+ was never awaited"
289 ):
290 self.assertRaises(
291 RuntimeError, self.loop.run_until_complete, coro2())
292
293 # Note: because of the default Windows timing granularity of
294 # 15.6 msec, we use fairly long sleep times here (~100 msec).
295
296 def test_run_until_complete(self):
297 t0 = self.loop.time()
298 self.loop.run_until_complete(asyncio.sleep(0.1))
299 t1 = self.loop.time()
300 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
301
302 def test_run_until_complete_stopped(self):
303
304 async def cb():
305 self.loop.stop()
306 await asyncio.sleep(0.1)
307 task = cb()
308 self.assertRaises(RuntimeError,
309 self.loop.run_until_complete, task)
310
311 def test_call_later(self):
312 results = []
313
314 def callback(arg):
315 results.append(arg)
316 self.loop.stop()
317
318 self.loop.call_later(0.1, callback, 'hello world')
319 self.loop.run_forever()
320 self.assertEqual(results, ['hello world'])
321
322 def test_call_soon(self):
323 results = []
324
325 def callback(arg1, arg2):
326 results.append((arg1, arg2))
327 self.loop.stop()
328
329 self.loop.call_soon(callback, 'hello', 'world')
330 self.loop.run_forever()
331 self.assertEqual(results, [('hello', 'world')])
332
333 def test_call_soon_threadsafe(self):
334 results = []
335 lock = threading.Lock()
336
337 def callback(arg):
338 results.append(arg)
339 if len(results) >= 2:
340 self.loop.stop()
341
342 def run_in_thread():
343 self.loop.call_soon_threadsafe(callback, 'hello')
344 lock.release()
345
346 lock.acquire()
347 t = threading.Thread(target=run_in_thread)
348 t.start()
349
350 with lock:
351 self.loop.call_soon(callback, 'world')
352 self.loop.run_forever()
353 t.join()
354 self.assertEqual(results, ['hello', 'world'])
355
356 def test_call_soon_threadsafe_same_thread(self):
357 results = []
358
359 def callback(arg):
360 results.append(arg)
361 if len(results) >= 2:
362 self.loop.stop()
363
364 self.loop.call_soon_threadsafe(callback, 'hello')
365 self.loop.call_soon(callback, 'world')
366 self.loop.run_forever()
367 self.assertEqual(results, ['hello', 'world'])
368
369 def test_run_in_executor(self):
370 def run(arg):
371 return (arg, threading.get_ident())
372 f2 = self.loop.run_in_executor(None, run, 'yo')
373 res, thread_id = self.loop.run_until_complete(f2)
374 self.assertEqual(res, 'yo')
375 self.assertNotEqual(thread_id, threading.get_ident())
376
377 def test_run_in_executor_cancel(self):
378 called = False
379
380 def patched_call_soon(*args):
381 nonlocal called
382 called = True
383
384 def run():
385 time.sleep(0.05)
386
387 f2 = self.loop.run_in_executor(None, run)
388 f2.cancel()
389 self.loop.run_until_complete(
390 self.loop.shutdown_default_executor())
391 self.loop.close()
392 self.loop.call_soon = patched_call_soon
393 self.loop.call_soon_threadsafe = patched_call_soon
394 time.sleep(0.4)
395 self.assertFalse(called)
396
397 def test_reader_callback(self):
398 r, w = socket.socketpair()
399 r.setblocking(False)
400 bytes_read = bytearray()
401
402 def reader():
403 try:
404 data = r.recv(1024)
405 except BlockingIOError:
406 # Spurious readiness notifications are possible
407 # at least on Linux -- see man select.
408 return
409 if data:
410 bytes_read.extend(data)
411 else:
412 self.assertTrue(self.loop.remove_reader(r.fileno()))
413 r.close()
414
415 self.loop.add_reader(r.fileno(), reader)
416 self.loop.call_soon(w.send, b'abc')
417 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
418 self.loop.call_soon(w.send, b'def')
419 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
420 self.loop.call_soon(w.close)
421 self.loop.call_soon(self.loop.stop)
422 self.loop.run_forever()
423 self.assertEqual(bytes_read, b'abcdef')
424
425 def test_writer_callback(self):
426 r, w = socket.socketpair()
427 w.setblocking(False)
428
429 def writer(data):
430 w.send(data)
431 self.loop.stop()
432
433 data = b'x' * 1024
434 self.loop.add_writer(w.fileno(), writer, data)
435 self.loop.run_forever()
436
437 self.assertTrue(self.loop.remove_writer(w.fileno()))
438 self.assertFalse(self.loop.remove_writer(w.fileno()))
439
440 w.close()
441 read = r.recv(len(data) * 2)
442 r.close()
443 self.assertEqual(read, data)
444
445 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
446 def test_add_signal_handler(self):
447 caught = 0
448
449 def my_handler():
450 nonlocal caught
451 caught += 1
452
453 # Check error behavior first.
454 self.assertRaises(
455 TypeError, self.loop.add_signal_handler, 'boom', my_handler)
456 self.assertRaises(
457 TypeError, self.loop.remove_signal_handler, 'boom')
458 self.assertRaises(
459 ValueError, self.loop.add_signal_handler, signal.NSIG+1,
460 my_handler)
461 self.assertRaises(
462 ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
463 self.assertRaises(
464 ValueError, self.loop.add_signal_handler, 0, my_handler)
465 self.assertRaises(
466 ValueError, self.loop.remove_signal_handler, 0)
467 self.assertRaises(
468 ValueError, self.loop.add_signal_handler, -1, my_handler)
469 self.assertRaises(
470 ValueError, self.loop.remove_signal_handler, -1)
471 self.assertRaises(
472 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
473 my_handler)
474 # Removing SIGKILL doesn't raise, since we don't call signal().
475 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
476 # Now set a handler and handle it.
477 self.loop.add_signal_handler(signal.SIGINT, my_handler)
478
479 os.kill(os.getpid(), signal.SIGINT)
480 test_utils.run_until(self.loop, lambda: caught)
481
482 # Removing it should restore the default handler.
483 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
484 self.assertEqual(signal.getsignal(signal.SIGINT),
485 signal.default_int_handler)
486 # Removing again returns False.
487 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
488
489 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
490 @unittest.skipUnless(hasattr(signal, 'setitimer'),
491 'need signal.setitimer()')
492 def test_signal_handling_while_selecting(self):
493 # Test with a signal actually arriving during a select() call.
494 caught = 0
495
496 def my_handler():
497 nonlocal caught
498 caught += 1
499 self.loop.stop()
500
501 self.loop.add_signal_handler(signal.SIGALRM, my_handler)
502
503 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once.
504 self.loop.call_later(60, self.loop.stop)
505 self.loop.run_forever()
506 self.assertEqual(caught, 1)
507
508 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
509 @unittest.skipUnless(hasattr(signal, 'setitimer'),
510 'need signal.setitimer()')
511 def test_signal_handling_args(self):
512 some_args = (42,)
513 caught = 0
514
515 def my_handler(*args):
516 nonlocal caught
517 caught += 1
518 self.assertEqual(args, some_args)
519 self.loop.stop()
520
521 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
522
523 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once.
524 self.loop.call_later(60, self.loop.stop)
525 self.loop.run_forever()
526 self.assertEqual(caught, 1)
527
528 def _basetest_create_connection(self, connection_fut, check_sockname=True):
529 tr, pr = self.loop.run_until_complete(connection_fut)
530 self.assertIsInstance(tr, asyncio.Transport)
531 self.assertIsInstance(pr, asyncio.Protocol)
532 self.assertIs(pr.transport, tr)
533 if check_sockname:
534 self.assertIsNotNone(tr.get_extra_info('sockname'))
535 self.loop.run_until_complete(pr.done)
536 self.assertGreater(pr.nbytes, 0)
537 tr.close()
538
539 def test_create_connection(self):
540 with test_utils.run_test_server() as httpd:
541 conn_fut = self.loop.create_connection(
542 lambda: MyProto(loop=self.loop), *httpd.address)
543 self._basetest_create_connection(conn_fut)
544
545 @socket_helper.skip_unless_bind_unix_socket
546 def test_create_unix_connection(self):
547 # Issue #20682: On Mac OS X Tiger, getsockname() returns a
548 # zero-length address for UNIX socket.
549 check_sockname = not broken_unix_getsockname()
550
551 with test_utils.run_test_unix_server() as httpd:
552 conn_fut = self.loop.create_unix_connection(
553 lambda: MyProto(loop=self.loop), httpd.address)
554 self._basetest_create_connection(conn_fut, check_sockname)
555
556 def check_ssl_extra_info(self, client, check_sockname=True,
557 peername=None, peercert={}):
558 if check_sockname:
559 self.assertIsNotNone(client.get_extra_info('sockname'))
560 if peername:
561 self.assertEqual(peername,
562 client.get_extra_info('peername'))
563 else:
564 self.assertIsNotNone(client.get_extra_info('peername'))
565 self.assertEqual(peercert,
566 client.get_extra_info('peercert'))
567
568 # test SSL cipher
569 cipher = client.get_extra_info('cipher')
570 self.assertIsInstance(cipher, tuple)
571 self.assertEqual(len(cipher), 3, cipher)
572 self.assertIsInstance(cipher[0], str)
573 self.assertIsInstance(cipher[1], str)
574 self.assertIsInstance(cipher[2], int)
575
576 # test SSL object
577 sslobj = client.get_extra_info('ssl_object')
578 self.assertIsNotNone(sslobj)
579 self.assertEqual(sslobj.compression(),
580 client.get_extra_info('compression'))
581 self.assertEqual(sslobj.cipher(),
582 client.get_extra_info('cipher'))
583 self.assertEqual(sslobj.getpeercert(),
584 client.get_extra_info('peercert'))
585 self.assertEqual(sslobj.compression(),
586 client.get_extra_info('compression'))
587
588 def _basetest_create_ssl_connection(self, connection_fut,
589 check_sockname=True,
590 peername=None):
591 tr, pr = self.loop.run_until_complete(connection_fut)
592 self.assertIsInstance(tr, asyncio.Transport)
593 self.assertIsInstance(pr, asyncio.Protocol)
594 self.assertTrue('ssl' in tr.__class__.__name__.lower())
595 self.check_ssl_extra_info(tr, check_sockname, peername)
596 self.loop.run_until_complete(pr.done)
597 self.assertGreater(pr.nbytes, 0)
598 tr.close()
599
600 def _test_create_ssl_connection(self, httpd, create_connection,
601 check_sockname=True, peername=None):
602 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
603 self._basetest_create_ssl_connection(conn_fut, check_sockname,
604 peername)
605
606 # ssl.Purpose was introduced in Python 3.4
607 if hasattr(ssl, 'Purpose'):
608 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
609 cafile=None, capath=None,
610 cadata=None):
611 """
612 A ssl.create_default_context() replacement that doesn't enable
613 cert validation.
614 """
615 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
616 return test_utils.dummy_ssl_context()
617
618 # With ssl=True, ssl.create_default_context() should be called
619 with mock.patch('ssl.create_default_context',
620 side_effect=_dummy_ssl_create_context) as m:
621 conn_fut = create_connection(ssl=True)
622 self._basetest_create_ssl_connection(conn_fut, check_sockname,
623 peername)
624 self.assertEqual(m.call_count, 1)
625
626 # With the real ssl.create_default_context(), certificate
627 # validation will fail
628 with self.assertRaises(ssl.SSLError) as cm:
629 conn_fut = create_connection(ssl=True)
630 # Ignore the "SSL handshake failed" log in debug mode
631 with test_utils.disable_logger():
632 self._basetest_create_ssl_connection(conn_fut, check_sockname,
633 peername)
634
635 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
636
637 @unittest.skipIf(ssl is None, 'No ssl module')
638 def test_create_ssl_connection(self):
639 with test_utils.run_test_server(use_ssl=True) as httpd:
640 create_connection = functools.partial(
641 self.loop.create_connection,
642 lambda: MyProto(loop=self.loop),
643 *httpd.address)
644 self._test_create_ssl_connection(httpd, create_connection,
645 peername=httpd.address)
646
647 @socket_helper.skip_unless_bind_unix_socket
648 @unittest.skipIf(ssl is None, 'No ssl module')
649 def test_create_ssl_unix_connection(self):
650 # Issue #20682: On Mac OS X Tiger, getsockname() returns a
651 # zero-length address for UNIX socket.
652 check_sockname = not broken_unix_getsockname()
653
654 with test_utils.run_test_unix_server(use_ssl=True) as httpd:
655 create_connection = functools.partial(
656 self.loop.create_unix_connection,
657 lambda: MyProto(loop=self.loop), httpd.address,
658 server_hostname='127.0.0.1')
659
660 self._test_create_ssl_connection(httpd, create_connection,
661 check_sockname,
662 peername=httpd.address)
663
664 def test_create_connection_local_addr(self):
665 with test_utils.run_test_server() as httpd:
666 port = socket_helper.find_unused_port()
667 f = self.loop.create_connection(
668 lambda: MyProto(loop=self.loop),
669 *httpd.address, local_addr=(httpd.address[0], port))
670 tr, pr = self.loop.run_until_complete(f)
671 expected = pr.transport.get_extra_info('sockname')[1]
672 self.assertEqual(port, expected)
673 tr.close()
674
675 @socket_helper.skip_if_tcp_blackhole
676 def test_create_connection_local_addr_skip_different_family(self):
677 # See https://github.com/python/cpython/issues/86508
678 port1 = socket_helper.find_unused_port()
679 port2 = socket_helper.find_unused_port()
680 getaddrinfo_orig = self.loop.getaddrinfo
681
682 async def getaddrinfo(host, port, *args, **kwargs):
683 if port == port2:
684 return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)),
685 (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))]
686 return await getaddrinfo_orig(host, port, *args, **kwargs)
687
688 self.loop.getaddrinfo = getaddrinfo
689
690 f = self.loop.create_connection(
691 lambda: MyProto(loop=self.loop),
692 'localhost', port1, local_addr=('localhost', port2))
693
694 with self.assertRaises(OSError):
695 self.loop.run_until_complete(f)
696
697 @socket_helper.skip_if_tcp_blackhole
698 def test_create_connection_local_addr_nomatch_family(self):
699 # See https://github.com/python/cpython/issues/86508
700 port1 = socket_helper.find_unused_port()
701 port2 = socket_helper.find_unused_port()
702 getaddrinfo_orig = self.loop.getaddrinfo
703
704 async def getaddrinfo(host, port, *args, **kwargs):
705 if port == port2:
706 return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))]
707 return await getaddrinfo_orig(host, port, *args, **kwargs)
708
709 self.loop.getaddrinfo = getaddrinfo
710
711 f = self.loop.create_connection(
712 lambda: MyProto(loop=self.loop),
713 'localhost', port1, local_addr=('localhost', port2))
714
715 with self.assertRaises(OSError):
716 self.loop.run_until_complete(f)
717
718 def test_create_connection_local_addr_in_use(self):
719 with test_utils.run_test_server() as httpd:
720 f = self.loop.create_connection(
721 lambda: MyProto(loop=self.loop),
722 *httpd.address, local_addr=httpd.address)
723 with self.assertRaises(OSError) as cm:
724 self.loop.run_until_complete(f)
725 self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
726 self.assertIn(str(httpd.address), cm.exception.strerror)
727
728 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
729 loop = self.loop
730
731 class ESC[4;38;5;81mMyProto(ESC[4;38;5;149mMyBaseProto):
732
733 def connection_lost(self, exc):
734 super().connection_lost(exc)
735 loop.call_soon(loop.stop)
736
737 def data_received(self, data):
738 super().data_received(data)
739 self.transport.write(expected_response)
740
741 lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
742 addr = lsock.getsockname()
743
744 message = b'test data'
745 response = None
746 expected_response = b'roger'
747
748 def client():
749 nonlocal response
750 try:
751 csock = socket.socket()
752 if client_ssl is not None:
753 csock = client_ssl.wrap_socket(csock)
754 csock.connect(addr)
755 csock.sendall(message)
756 response = csock.recv(99)
757 csock.close()
758 except Exception as exc:
759 print(
760 "Failure in client thread in test_connect_accepted_socket",
761 exc)
762
763 thread = threading.Thread(target=client, daemon=True)
764 thread.start()
765
766 conn, _ = lsock.accept()
767 proto = MyProto(loop=loop)
768 proto.loop = loop
769 loop.run_until_complete(
770 loop.connect_accepted_socket(
771 (lambda: proto), conn, ssl=server_ssl))
772 loop.run_forever()
773 proto.transport.close()
774 lsock.close()
775
776 threading_helper.join_thread(thread)
777 self.assertFalse(thread.is_alive())
778 self.assertEqual(proto.state, 'CLOSED')
779 self.assertEqual(proto.nbytes, len(message))
780 self.assertEqual(response, expected_response)
781
782 @unittest.skipIf(ssl is None, 'No ssl module')
783 def test_ssl_connect_accepted_socket(self):
784 server_context = test_utils.simple_server_sslcontext()
785 client_context = test_utils.simple_client_sslcontext()
786
787 self.test_connect_accepted_socket(server_context, client_context)
788
789 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
790 sock = socket.socket()
791 self.addCleanup(sock.close)
792 coro = self.loop.connect_accepted_socket(
793 MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT)
794 with self.assertRaisesRegex(
795 ValueError,
796 'ssl_handshake_timeout is only meaningful with ssl'):
797 self.loop.run_until_complete(coro)
798
799 @mock.patch('asyncio.base_events.socket')
800 def create_server_multiple_hosts(self, family, hosts, mock_sock):
801 async def getaddrinfo(host, port, *args, **kw):
802 if family == socket.AF_INET:
803 return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
804 else:
805 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
806
807 def getaddrinfo_task(*args, **kwds):
808 return self.loop.create_task(getaddrinfo(*args, **kwds))
809
810 unique_hosts = set(hosts)
811
812 if family == socket.AF_INET:
813 mock_sock.socket().getsockbyname.side_effect = [
814 (host, 80) for host in unique_hosts]
815 else:
816 mock_sock.socket().getsockbyname.side_effect = [
817 (host, 80, 0, 0) for host in unique_hosts]
818 self.loop.getaddrinfo = getaddrinfo_task
819 self.loop._start_serving = mock.Mock()
820 self.loop._stop_serving = mock.Mock()
821 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
822 server = self.loop.run_until_complete(f)
823 self.addCleanup(server.close)
824 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
825 self.assertEqual(server_hosts, unique_hosts)
826
827 def test_create_server_multiple_hosts_ipv4(self):
828 self.create_server_multiple_hosts(socket.AF_INET,
829 ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
830
831 def test_create_server_multiple_hosts_ipv6(self):
832 self.create_server_multiple_hosts(socket.AF_INET6,
833 ['::1', '::2', '::1'])
834
835 def test_create_server(self):
836 proto = MyProto(self.loop)
837 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
838 server = self.loop.run_until_complete(f)
839 self.assertEqual(len(server.sockets), 1)
840 sock = server.sockets[0]
841 host, port = sock.getsockname()
842 self.assertEqual(host, '0.0.0.0')
843 client = socket.socket()
844 client.connect(('127.0.0.1', port))
845 client.sendall(b'xxx')
846
847 self.loop.run_until_complete(proto.connected)
848 self.assertEqual('CONNECTED', proto.state)
849
850 test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
851 self.assertEqual(3, proto.nbytes)
852
853 # extra info is available
854 self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
855 self.assertEqual('127.0.0.1',
856 proto.transport.get_extra_info('peername')[0])
857
858 # close connection
859 proto.transport.close()
860 self.loop.run_until_complete(proto.done)
861
862 self.assertEqual('CLOSED', proto.state)
863
864 # the client socket must be closed after to avoid ECONNRESET upon
865 # recv()/send() on the serving socket
866 client.close()
867
868 # close server
869 server.close()
870
871 def test_create_server_trsock(self):
872 proto = MyProto(self.loop)
873 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
874 server = self.loop.run_until_complete(f)
875 self.assertEqual(len(server.sockets), 1)
876 sock = server.sockets[0]
877 self.assertIsInstance(sock, asyncio.trsock.TransportSocket)
878 host, port = sock.getsockname()
879 self.assertEqual(host, '0.0.0.0')
880 dup = sock.dup()
881 self.addCleanup(dup.close)
882 self.assertIsInstance(dup, socket.socket)
883 self.assertFalse(sock.get_inheritable())
884 with self.assertRaises(ValueError):
885 sock.settimeout(1)
886 sock.settimeout(0)
887 self.assertEqual(sock.gettimeout(), 0)
888 with self.assertRaises(ValueError):
889 sock.setblocking(True)
890 sock.setblocking(False)
891 server.close()
892
893
894 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
895 def test_create_server_reuse_port(self):
896 proto = MyProto(self.loop)
897 f = self.loop.create_server(
898 lambda: proto, '0.0.0.0', 0)
899 server = self.loop.run_until_complete(f)
900 self.assertEqual(len(server.sockets), 1)
901 sock = server.sockets[0]
902 self.assertFalse(
903 sock.getsockopt(
904 socket.SOL_SOCKET, socket.SO_REUSEPORT))
905 server.close()
906
907 test_utils.run_briefly(self.loop)
908
909 proto = MyProto(self.loop)
910 f = self.loop.create_server(
911 lambda: proto, '0.0.0.0', 0, reuse_port=True)
912 server = self.loop.run_until_complete(f)
913 self.assertEqual(len(server.sockets), 1)
914 sock = server.sockets[0]
915 self.assertTrue(
916 sock.getsockopt(
917 socket.SOL_SOCKET, socket.SO_REUSEPORT))
918 server.close()
919
920 def _make_unix_server(self, factory, **kwargs):
921 path = test_utils.gen_unix_socket_path()
922 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
923
924 f = self.loop.create_unix_server(factory, path, **kwargs)
925 server = self.loop.run_until_complete(f)
926
927 return server, path
928
929 @socket_helper.skip_unless_bind_unix_socket
930 def test_create_unix_server(self):
931 proto = MyProto(loop=self.loop)
932 server, path = self._make_unix_server(lambda: proto)
933 self.assertEqual(len(server.sockets), 1)
934
935 client = socket.socket(socket.AF_UNIX)
936 client.connect(path)
937 client.sendall(b'xxx')
938
939 self.loop.run_until_complete(proto.connected)
940 self.assertEqual('CONNECTED', proto.state)
941 test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
942 self.assertEqual(3, proto.nbytes)
943
944 # close connection
945 proto.transport.close()
946 self.loop.run_until_complete(proto.done)
947
948 self.assertEqual('CLOSED', proto.state)
949
950 # the client socket must be closed after to avoid ECONNRESET upon
951 # recv()/send() on the serving socket
952 client.close()
953
954 # close server
955 server.close()
956
957 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
958 def test_create_unix_server_path_socket_error(self):
959 proto = MyProto(loop=self.loop)
960 sock = socket.socket()
961 with sock:
962 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
963 with self.assertRaisesRegex(ValueError,
964 'path and sock can not be specified '
965 'at the same time'):
966 self.loop.run_until_complete(f)
967
968 def _create_ssl_context(self, certfile, keyfile=None):
969 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
970 sslcontext.options |= ssl.OP_NO_SSLv2
971 sslcontext.load_cert_chain(certfile, keyfile)
972 return sslcontext
973
974 def _make_ssl_server(self, factory, certfile, keyfile=None):
975 sslcontext = self._create_ssl_context(certfile, keyfile)
976
977 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
978 server = self.loop.run_until_complete(f)
979
980 sock = server.sockets[0]
981 host, port = sock.getsockname()
982 self.assertEqual(host, '127.0.0.1')
983 return server, host, port
984
985 def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
986 sslcontext = self._create_ssl_context(certfile, keyfile)
987 return self._make_unix_server(factory, ssl=sslcontext)
988
989 @unittest.skipIf(ssl is None, 'No ssl module')
990 def test_create_server_ssl(self):
991 proto = MyProto(loop=self.loop)
992 server, host, port = self._make_ssl_server(
993 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
994
995 f_c = self.loop.create_connection(MyBaseProto, host, port,
996 ssl=test_utils.dummy_ssl_context())
997 client, pr = self.loop.run_until_complete(f_c)
998
999 client.write(b'xxx')
1000 self.loop.run_until_complete(proto.connected)
1001 self.assertEqual('CONNECTED', proto.state)
1002
1003 test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1004 self.assertEqual(3, proto.nbytes)
1005
1006 # extra info is available
1007 self.check_ssl_extra_info(client, peername=(host, port))
1008
1009 # close connection
1010 proto.transport.close()
1011 self.loop.run_until_complete(proto.done)
1012 self.assertEqual('CLOSED', proto.state)
1013
1014 # the client socket must be closed after to avoid ECONNRESET upon
1015 # recv()/send() on the serving socket
1016 client.close()
1017
1018 # stop serving
1019 server.close()
1020
1021 @socket_helper.skip_unless_bind_unix_socket
1022 @unittest.skipIf(ssl is None, 'No ssl module')
1023 def test_create_unix_server_ssl(self):
1024 proto = MyProto(loop=self.loop)
1025 server, path = self._make_ssl_unix_server(
1026 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
1027
1028 f_c = self.loop.create_unix_connection(
1029 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
1030 server_hostname='')
1031
1032 client, pr = self.loop.run_until_complete(f_c)
1033
1034 client.write(b'xxx')
1035 self.loop.run_until_complete(proto.connected)
1036 self.assertEqual('CONNECTED', proto.state)
1037 test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1038 self.assertEqual(3, proto.nbytes)
1039
1040 # close connection
1041 proto.transport.close()
1042 self.loop.run_until_complete(proto.done)
1043 self.assertEqual('CLOSED', proto.state)
1044
1045 # the client socket must be closed after to avoid ECONNRESET upon
1046 # recv()/send() on the serving socket
1047 client.close()
1048
1049 # stop serving
1050 server.close()
1051
1052 @unittest.skipIf(ssl is None, 'No ssl module')
1053 def test_create_server_ssl_verify_failed(self):
1054 proto = MyProto(loop=self.loop)
1055 server, host, port = self._make_ssl_server(
1056 lambda: proto, test_utils.SIGNED_CERTFILE)
1057
1058 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1059 sslcontext_client.options |= ssl.OP_NO_SSLv2
1060 sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1061 if hasattr(sslcontext_client, 'check_hostname'):
1062 sslcontext_client.check_hostname = True
1063
1064
1065 # no CA loaded
1066 f_c = self.loop.create_connection(MyProto, host, port,
1067 ssl=sslcontext_client)
1068 with mock.patch.object(self.loop, 'call_exception_handler'):
1069 with test_utils.disable_logger():
1070 with self.assertRaisesRegex(ssl.SSLError,
1071 '(?i)certificate.verify.failed'):
1072 self.loop.run_until_complete(f_c)
1073
1074 # execute the loop to log the connection error
1075 test_utils.run_briefly(self.loop)
1076
1077 # close connection
1078 self.assertIsNone(proto.transport)
1079 server.close()
1080
1081 @socket_helper.skip_unless_bind_unix_socket
1082 @unittest.skipIf(ssl is None, 'No ssl module')
1083 def test_create_unix_server_ssl_verify_failed(self):
1084 proto = MyProto(loop=self.loop)
1085 server, path = self._make_ssl_unix_server(
1086 lambda: proto, test_utils.SIGNED_CERTFILE)
1087
1088 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1089 sslcontext_client.options |= ssl.OP_NO_SSLv2
1090 sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1091 if hasattr(sslcontext_client, 'check_hostname'):
1092 sslcontext_client.check_hostname = True
1093
1094 # no CA loaded
1095 f_c = self.loop.create_unix_connection(MyProto, path,
1096 ssl=sslcontext_client,
1097 server_hostname='invalid')
1098 with mock.patch.object(self.loop, 'call_exception_handler'):
1099 with test_utils.disable_logger():
1100 with self.assertRaisesRegex(ssl.SSLError,
1101 '(?i)certificate.verify.failed'):
1102 self.loop.run_until_complete(f_c)
1103
1104 # execute the loop to log the connection error
1105 test_utils.run_briefly(self.loop)
1106
1107 # close connection
1108 self.assertIsNone(proto.transport)
1109 server.close()
1110
1111 @unittest.skipIf(ssl is None, 'No ssl module')
1112 def test_create_server_ssl_match_failed(self):
1113 proto = MyProto(loop=self.loop)
1114 server, host, port = self._make_ssl_server(
1115 lambda: proto, test_utils.SIGNED_CERTFILE)
1116
1117 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1118 sslcontext_client.options |= ssl.OP_NO_SSLv2
1119 sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1120 sslcontext_client.load_verify_locations(
1121 cafile=test_utils.SIGNING_CA)
1122 if hasattr(sslcontext_client, 'check_hostname'):
1123 sslcontext_client.check_hostname = True
1124
1125 # incorrect server_hostname
1126 f_c = self.loop.create_connection(MyProto, host, port,
1127 ssl=sslcontext_client)
1128 with mock.patch.object(self.loop, 'call_exception_handler'):
1129 with test_utils.disable_logger():
1130 with self.assertRaisesRegex(
1131 ssl.CertificateError,
1132 "IP address mismatch, certificate is not valid for "
1133 "'127.0.0.1'"):
1134 self.loop.run_until_complete(f_c)
1135
1136 # close connection
1137 # transport is None because TLS ALERT aborted the handshake
1138 self.assertIsNone(proto.transport)
1139 server.close()
1140
1141 @socket_helper.skip_unless_bind_unix_socket
1142 @unittest.skipIf(ssl is None, 'No ssl module')
1143 def test_create_unix_server_ssl_verified(self):
1144 proto = MyProto(loop=self.loop)
1145 server, path = self._make_ssl_unix_server(
1146 lambda: proto, test_utils.SIGNED_CERTFILE)
1147
1148 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1149 sslcontext_client.options |= ssl.OP_NO_SSLv2
1150 sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1151 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1152 if hasattr(sslcontext_client, 'check_hostname'):
1153 sslcontext_client.check_hostname = True
1154
1155 # Connection succeeds with correct CA and server hostname.
1156 f_c = self.loop.create_unix_connection(MyProto, path,
1157 ssl=sslcontext_client,
1158 server_hostname='localhost')
1159 client, pr = self.loop.run_until_complete(f_c)
1160 self.loop.run_until_complete(proto.connected)
1161
1162 # close connection
1163 proto.transport.close()
1164 client.close()
1165 server.close()
1166 self.loop.run_until_complete(proto.done)
1167
1168 @unittest.skipIf(ssl is None, 'No ssl module')
1169 def test_create_server_ssl_verified(self):
1170 proto = MyProto(loop=self.loop)
1171 server, host, port = self._make_ssl_server(
1172 lambda: proto, test_utils.SIGNED_CERTFILE)
1173
1174 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1175 sslcontext_client.options |= ssl.OP_NO_SSLv2
1176 sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1177 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1178 if hasattr(sslcontext_client, 'check_hostname'):
1179 sslcontext_client.check_hostname = True
1180
1181 # Connection succeeds with correct CA and server hostname.
1182 f_c = self.loop.create_connection(MyProto, host, port,
1183 ssl=sslcontext_client,
1184 server_hostname='localhost')
1185 client, pr = self.loop.run_until_complete(f_c)
1186 self.loop.run_until_complete(proto.connected)
1187
1188 # extra info is available
1189 self.check_ssl_extra_info(client, peername=(host, port),
1190 peercert=test_utils.PEERCERT)
1191
1192 # close connection
1193 proto.transport.close()
1194 client.close()
1195 server.close()
1196 self.loop.run_until_complete(proto.done)
1197
1198 def test_create_server_sock(self):
1199 proto = self.loop.create_future()
1200
1201 class ESC[4;38;5;81mTestMyProto(ESC[4;38;5;149mMyProto):
1202 def connection_made(self, transport):
1203 super().connection_made(transport)
1204 proto.set_result(self)
1205
1206 sock_ob = socket.create_server(('0.0.0.0', 0))
1207
1208 f = self.loop.create_server(TestMyProto, sock=sock_ob)
1209 server = self.loop.run_until_complete(f)
1210 sock = server.sockets[0]
1211 self.assertEqual(sock.fileno(), sock_ob.fileno())
1212
1213 host, port = sock.getsockname()
1214 self.assertEqual(host, '0.0.0.0')
1215 client = socket.socket()
1216 client.connect(('127.0.0.1', port))
1217 client.send(b'xxx')
1218 client.close()
1219 server.close()
1220
1221 def test_create_server_addr_in_use(self):
1222 sock_ob = socket.create_server(('0.0.0.0', 0))
1223
1224 f = self.loop.create_server(MyProto, sock=sock_ob)
1225 server = self.loop.run_until_complete(f)
1226 sock = server.sockets[0]
1227 host, port = sock.getsockname()
1228
1229 f = self.loop.create_server(MyProto, host=host, port=port)
1230 with self.assertRaises(OSError) as cm:
1231 self.loop.run_until_complete(f)
1232 self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1233
1234 server.close()
1235
1236 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1237 def test_create_server_dual_stack(self):
1238 f_proto = self.loop.create_future()
1239
1240 class ESC[4;38;5;81mTestMyProto(ESC[4;38;5;149mMyProto):
1241 def connection_made(self, transport):
1242 super().connection_made(transport)
1243 f_proto.set_result(self)
1244
1245 try_count = 0
1246 while True:
1247 try:
1248 port = socket_helper.find_unused_port()
1249 f = self.loop.create_server(TestMyProto, host=None, port=port)
1250 server = self.loop.run_until_complete(f)
1251 except OSError as ex:
1252 if ex.errno == errno.EADDRINUSE:
1253 try_count += 1
1254 self.assertGreaterEqual(5, try_count)
1255 continue
1256 else:
1257 raise
1258 else:
1259 break
1260 client = socket.socket()
1261 client.connect(('127.0.0.1', port))
1262 client.send(b'xxx')
1263 proto = self.loop.run_until_complete(f_proto)
1264 proto.transport.close()
1265 client.close()
1266
1267 f_proto = self.loop.create_future()
1268 client = socket.socket(socket.AF_INET6)
1269 client.connect(('::1', port))
1270 client.send(b'xxx')
1271 proto = self.loop.run_until_complete(f_proto)
1272 proto.transport.close()
1273 client.close()
1274
1275 server.close()
1276
1277 @socket_helper.skip_if_tcp_blackhole
1278 def test_server_close(self):
1279 f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1280 server = self.loop.run_until_complete(f)
1281 sock = server.sockets[0]
1282 host, port = sock.getsockname()
1283
1284 client = socket.socket()
1285 client.connect(('127.0.0.1', port))
1286 client.send(b'xxx')
1287 client.close()
1288
1289 server.close()
1290
1291 client = socket.socket()
1292 self.assertRaises(
1293 ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1294 client.close()
1295
1296 def _test_create_datagram_endpoint(self, local_addr, family):
1297 class ESC[4;38;5;81mTestMyDatagramProto(ESC[4;38;5;149mMyDatagramProto):
1298 def __init__(inner_self):
1299 super().__init__(loop=self.loop)
1300
1301 def datagram_received(self, data, addr):
1302 super().datagram_received(data, addr)
1303 self.transport.sendto(b'resp:'+data, addr)
1304
1305 coro = self.loop.create_datagram_endpoint(
1306 TestMyDatagramProto, local_addr=local_addr, family=family)
1307 s_transport, server = self.loop.run_until_complete(coro)
1308 sockname = s_transport.get_extra_info('sockname')
1309 host, port = socket.getnameinfo(
1310 sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
1311
1312 self.assertIsInstance(s_transport, asyncio.Transport)
1313 self.assertIsInstance(server, TestMyDatagramProto)
1314 self.assertEqual('INITIALIZED', server.state)
1315 self.assertIs(server.transport, s_transport)
1316
1317 coro = self.loop.create_datagram_endpoint(
1318 lambda: MyDatagramProto(loop=self.loop),
1319 remote_addr=(host, port))
1320 transport, client = self.loop.run_until_complete(coro)
1321
1322 self.assertIsInstance(transport, asyncio.Transport)
1323 self.assertIsInstance(client, MyDatagramProto)
1324 self.assertEqual('INITIALIZED', client.state)
1325 self.assertIs(client.transport, transport)
1326
1327 transport.sendto(b'xxx')
1328 test_utils.run_until(self.loop, lambda: server.nbytes)
1329 self.assertEqual(3, server.nbytes)
1330 test_utils.run_until(self.loop, lambda: client.nbytes)
1331
1332 # received
1333 self.assertEqual(8, client.nbytes)
1334
1335 # extra info is available
1336 self.assertIsNotNone(transport.get_extra_info('sockname'))
1337
1338 # close connection
1339 transport.close()
1340 self.loop.run_until_complete(client.done)
1341 self.assertEqual('CLOSED', client.state)
1342 server.transport.close()
1343
1344 def test_create_datagram_endpoint(self):
1345 self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
1346
1347 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1348 def test_create_datagram_endpoint_ipv6(self):
1349 self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
1350
1351 def test_create_datagram_endpoint_sock(self):
1352 sock = None
1353 local_address = ('127.0.0.1', 0)
1354 infos = self.loop.run_until_complete(
1355 self.loop.getaddrinfo(
1356 *local_address, type=socket.SOCK_DGRAM))
1357 for family, type, proto, cname, address in infos:
1358 try:
1359 sock = socket.socket(family=family, type=type, proto=proto)
1360 sock.setblocking(False)
1361 sock.bind(address)
1362 except:
1363 pass
1364 else:
1365 break
1366 else:
1367 self.fail('Can not create socket.')
1368
1369 f = self.loop.create_datagram_endpoint(
1370 lambda: MyDatagramProto(loop=self.loop), sock=sock)
1371 tr, pr = self.loop.run_until_complete(f)
1372 self.assertIsInstance(tr, asyncio.Transport)
1373 self.assertIsInstance(pr, MyDatagramProto)
1374 tr.close()
1375 self.loop.run_until_complete(pr.done)
1376
1377 def test_internal_fds(self):
1378 loop = self.create_event_loop()
1379 if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1380 loop.close()
1381 self.skipTest('loop is not a BaseSelectorEventLoop')
1382
1383 self.assertEqual(1, loop._internal_fds)
1384 loop.close()
1385 self.assertEqual(0, loop._internal_fds)
1386 self.assertIsNone(loop._csock)
1387 self.assertIsNone(loop._ssock)
1388
1389 @unittest.skipUnless(sys.platform != 'win32',
1390 "Don't support pipes for Windows")
1391 def test_read_pipe(self):
1392 proto = MyReadPipeProto(loop=self.loop)
1393
1394 rpipe, wpipe = os.pipe()
1395 pipeobj = io.open(rpipe, 'rb', 1024)
1396
1397 async def connect():
1398 t, p = await self.loop.connect_read_pipe(
1399 lambda: proto, pipeobj)
1400 self.assertIs(p, proto)
1401 self.assertIs(t, proto.transport)
1402 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1403 self.assertEqual(0, proto.nbytes)
1404
1405 self.loop.run_until_complete(connect())
1406
1407 os.write(wpipe, b'1')
1408 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1409 self.assertEqual(1, proto.nbytes)
1410
1411 os.write(wpipe, b'2345')
1412 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1413 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1414 self.assertEqual(5, proto.nbytes)
1415
1416 os.close(wpipe)
1417 self.loop.run_until_complete(proto.done)
1418 self.assertEqual(
1419 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1420 # extra info is available
1421 self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1422
1423 @unittest.skipUnless(sys.platform != 'win32',
1424 "Don't support pipes for Windows")
1425 def test_unclosed_pipe_transport(self):
1426 # This test reproduces the issue #314 on GitHub
1427 loop = self.create_event_loop()
1428 read_proto = MyReadPipeProto(loop=loop)
1429 write_proto = MyWritePipeProto(loop=loop)
1430
1431 rpipe, wpipe = os.pipe()
1432 rpipeobj = io.open(rpipe, 'rb', 1024)
1433 wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8")
1434
1435 async def connect():
1436 read_transport, _ = await loop.connect_read_pipe(
1437 lambda: read_proto, rpipeobj)
1438 write_transport, _ = await loop.connect_write_pipe(
1439 lambda: write_proto, wpipeobj)
1440 return read_transport, write_transport
1441
1442 # Run and close the loop without closing the transports
1443 read_transport, write_transport = loop.run_until_complete(connect())
1444 loop.close()
1445
1446 # These 'repr' calls used to raise an AttributeError
1447 # See Issue #314 on GitHub
1448 self.assertIn('open', repr(read_transport))
1449 self.assertIn('open', repr(write_transport))
1450
1451 # Clean up (avoid ResourceWarning)
1452 rpipeobj.close()
1453 wpipeobj.close()
1454 read_transport._pipe = None
1455 write_transport._pipe = None
1456
1457 @unittest.skipUnless(sys.platform != 'win32',
1458 "Don't support pipes for Windows")
1459 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1460 def test_read_pty_output(self):
1461 proto = MyReadPipeProto(loop=self.loop)
1462
1463 master, slave = os.openpty()
1464 master_read_obj = io.open(master, 'rb', 0)
1465
1466 async def connect():
1467 t, p = await self.loop.connect_read_pipe(lambda: proto,
1468 master_read_obj)
1469 self.assertIs(p, proto)
1470 self.assertIs(t, proto.transport)
1471 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1472 self.assertEqual(0, proto.nbytes)
1473
1474 self.loop.run_until_complete(connect())
1475
1476 os.write(slave, b'1')
1477 test_utils.run_until(self.loop, lambda: proto.nbytes)
1478 self.assertEqual(1, proto.nbytes)
1479
1480 os.write(slave, b'2345')
1481 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1482 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1483 self.assertEqual(5, proto.nbytes)
1484
1485 os.close(slave)
1486 proto.transport.close()
1487 self.loop.run_until_complete(proto.done)
1488 self.assertEqual(
1489 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1490 # extra info is available
1491 self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1492
1493 @unittest.skipUnless(sys.platform != 'win32',
1494 "Don't support pipes for Windows")
1495 def test_write_pipe(self):
1496 rpipe, wpipe = os.pipe()
1497 pipeobj = io.open(wpipe, 'wb', 1024)
1498
1499 proto = MyWritePipeProto(loop=self.loop)
1500 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1501 transport, p = self.loop.run_until_complete(connect)
1502 self.assertIs(p, proto)
1503 self.assertIs(transport, proto.transport)
1504 self.assertEqual('CONNECTED', proto.state)
1505
1506 transport.write(b'1')
1507
1508 data = bytearray()
1509 def reader(data):
1510 chunk = os.read(rpipe, 1024)
1511 data += chunk
1512 return len(data)
1513
1514 test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1515 self.assertEqual(b'1', data)
1516
1517 transport.write(b'2345')
1518 test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1519 self.assertEqual(b'12345', data)
1520 self.assertEqual('CONNECTED', proto.state)
1521
1522 os.close(rpipe)
1523
1524 # extra info is available
1525 self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1526
1527 # close connection
1528 proto.transport.close()
1529 self.loop.run_until_complete(proto.done)
1530 self.assertEqual('CLOSED', proto.state)
1531
1532 @unittest.skipUnless(sys.platform != 'win32',
1533 "Don't support pipes for Windows")
1534 def test_write_pipe_disconnect_on_close(self):
1535 rsock, wsock = socket.socketpair()
1536 rsock.setblocking(False)
1537 pipeobj = io.open(wsock.detach(), 'wb', 1024)
1538
1539 proto = MyWritePipeProto(loop=self.loop)
1540 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1541 transport, p = self.loop.run_until_complete(connect)
1542 self.assertIs(p, proto)
1543 self.assertIs(transport, proto.transport)
1544 self.assertEqual('CONNECTED', proto.state)
1545
1546 transport.write(b'1')
1547 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1548 self.assertEqual(b'1', data)
1549
1550 rsock.close()
1551
1552 self.loop.run_until_complete(proto.done)
1553 self.assertEqual('CLOSED', proto.state)
1554
1555 @unittest.skipUnless(sys.platform != 'win32',
1556 "Don't support pipes for Windows")
1557 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1558 # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1559 # older than 10.6 (Snow Leopard)
1560 @support.requires_mac_ver(10, 6)
1561 def test_write_pty(self):
1562 master, slave = os.openpty()
1563 slave_write_obj = io.open(slave, 'wb', 0)
1564
1565 proto = MyWritePipeProto(loop=self.loop)
1566 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1567 transport, p = self.loop.run_until_complete(connect)
1568 self.assertIs(p, proto)
1569 self.assertIs(transport, proto.transport)
1570 self.assertEqual('CONNECTED', proto.state)
1571
1572 transport.write(b'1')
1573
1574 data = bytearray()
1575 def reader(data):
1576 chunk = os.read(master, 1024)
1577 data += chunk
1578 return len(data)
1579
1580 test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1581 timeout=support.SHORT_TIMEOUT)
1582 self.assertEqual(b'1', data)
1583
1584 transport.write(b'2345')
1585 test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1586 timeout=support.SHORT_TIMEOUT)
1587 self.assertEqual(b'12345', data)
1588 self.assertEqual('CONNECTED', proto.state)
1589
1590 os.close(master)
1591
1592 # extra info is available
1593 self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1594
1595 # close connection
1596 proto.transport.close()
1597 self.loop.run_until_complete(proto.done)
1598 self.assertEqual('CLOSED', proto.state)
1599
1600 @unittest.skipUnless(sys.platform != 'win32',
1601 "Don't support pipes for Windows")
1602 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1603 # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1604 # older than 10.6 (Snow Leopard)
1605 @support.requires_mac_ver(10, 6)
1606 def test_bidirectional_pty(self):
1607 master, read_slave = os.openpty()
1608 write_slave = os.dup(read_slave)
1609 tty.setraw(read_slave)
1610
1611 slave_read_obj = io.open(read_slave, 'rb', 0)
1612 read_proto = MyReadPipeProto(loop=self.loop)
1613 read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1614 slave_read_obj)
1615 read_transport, p = self.loop.run_until_complete(read_connect)
1616 self.assertIs(p, read_proto)
1617 self.assertIs(read_transport, read_proto.transport)
1618 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1619 self.assertEqual(0, read_proto.nbytes)
1620
1621
1622 slave_write_obj = io.open(write_slave, 'wb', 0)
1623 write_proto = MyWritePipeProto(loop=self.loop)
1624 write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1625 slave_write_obj)
1626 write_transport, p = self.loop.run_until_complete(write_connect)
1627 self.assertIs(p, write_proto)
1628 self.assertIs(write_transport, write_proto.transport)
1629 self.assertEqual('CONNECTED', write_proto.state)
1630
1631 data = bytearray()
1632 def reader(data):
1633 chunk = os.read(master, 1024)
1634 data += chunk
1635 return len(data)
1636
1637 write_transport.write(b'1')
1638 test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1639 timeout=support.SHORT_TIMEOUT)
1640 self.assertEqual(b'1', data)
1641 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1642 self.assertEqual('CONNECTED', write_proto.state)
1643
1644 os.write(master, b'a')
1645 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1646 timeout=support.SHORT_TIMEOUT)
1647 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1648 self.assertEqual(1, read_proto.nbytes)
1649 self.assertEqual('CONNECTED', write_proto.state)
1650
1651 write_transport.write(b'2345')
1652 test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1653 timeout=support.SHORT_TIMEOUT)
1654 self.assertEqual(b'12345', data)
1655 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1656 self.assertEqual('CONNECTED', write_proto.state)
1657
1658 os.write(master, b'bcde')
1659 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1660 timeout=support.SHORT_TIMEOUT)
1661 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1662 self.assertEqual(5, read_proto.nbytes)
1663 self.assertEqual('CONNECTED', write_proto.state)
1664
1665 os.close(master)
1666
1667 read_transport.close()
1668 self.loop.run_until_complete(read_proto.done)
1669 self.assertEqual(
1670 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1671
1672 write_transport.close()
1673 self.loop.run_until_complete(write_proto.done)
1674 self.assertEqual('CLOSED', write_proto.state)
1675
1676 def test_prompt_cancellation(self):
1677 r, w = socket.socketpair()
1678 r.setblocking(False)
1679 f = self.loop.create_task(self.loop.sock_recv(r, 1))
1680 ov = getattr(f, 'ov', None)
1681 if ov is not None:
1682 self.assertTrue(ov.pending)
1683
1684 async def main():
1685 try:
1686 self.loop.call_soon(f.cancel)
1687 await f
1688 except asyncio.CancelledError:
1689 res = 'cancelled'
1690 else:
1691 res = None
1692 finally:
1693 self.loop.stop()
1694 return res
1695
1696 start = time.monotonic()
1697 t = self.loop.create_task(main())
1698 self.loop.run_forever()
1699 elapsed = time.monotonic() - start
1700
1701 self.assertLess(elapsed, 0.1)
1702 self.assertEqual(t.result(), 'cancelled')
1703 self.assertRaises(asyncio.CancelledError, f.result)
1704 if ov is not None:
1705 self.assertFalse(ov.pending)
1706 self.loop._stop_serving(r)
1707
1708 r.close()
1709 w.close()
1710
1711 def test_timeout_rounding(self):
1712 def _run_once():
1713 self.loop._run_once_counter += 1
1714 orig_run_once()
1715
1716 orig_run_once = self.loop._run_once
1717 self.loop._run_once_counter = 0
1718 self.loop._run_once = _run_once
1719
1720 async def wait():
1721 loop = self.loop
1722 await asyncio.sleep(1e-2)
1723 await asyncio.sleep(1e-4)
1724 await asyncio.sleep(1e-6)
1725 await asyncio.sleep(1e-8)
1726 await asyncio.sleep(1e-10)
1727
1728 self.loop.run_until_complete(wait())
1729 # The ideal number of call is 12, but on some platforms, the selector
1730 # may sleep at little bit less than timeout depending on the resolution
1731 # of the clock used by the kernel. Tolerate a few useless calls on
1732 # these platforms.
1733 self.assertLessEqual(self.loop._run_once_counter, 20,
1734 {'clock_resolution': self.loop._clock_resolution,
1735 'selector': self.loop._selector.__class__.__name__})
1736
1737 def test_remove_fds_after_closing(self):
1738 loop = self.create_event_loop()
1739 callback = lambda: None
1740 r, w = socket.socketpair()
1741 self.addCleanup(r.close)
1742 self.addCleanup(w.close)
1743 loop.add_reader(r, callback)
1744 loop.add_writer(w, callback)
1745 loop.close()
1746 self.assertFalse(loop.remove_reader(r))
1747 self.assertFalse(loop.remove_writer(w))
1748
1749 def test_add_fds_after_closing(self):
1750 loop = self.create_event_loop()
1751 callback = lambda: None
1752 r, w = socket.socketpair()
1753 self.addCleanup(r.close)
1754 self.addCleanup(w.close)
1755 loop.close()
1756 with self.assertRaises(RuntimeError):
1757 loop.add_reader(r, callback)
1758 with self.assertRaises(RuntimeError):
1759 loop.add_writer(w, callback)
1760
1761 def test_close_running_event_loop(self):
1762 async def close_loop(loop):
1763 self.loop.close()
1764
1765 coro = close_loop(self.loop)
1766 with self.assertRaises(RuntimeError):
1767 self.loop.run_until_complete(coro)
1768
1769 def test_close(self):
1770 self.loop.close()
1771
1772 async def test():
1773 pass
1774
1775 func = lambda: False
1776 coro = test()
1777 self.addCleanup(coro.close)
1778
1779 # operation blocked when the loop is closed
1780 with self.assertRaises(RuntimeError):
1781 self.loop.run_forever()
1782 with self.assertRaises(RuntimeError):
1783 fut = self.loop.create_future()
1784 self.loop.run_until_complete(fut)
1785 with self.assertRaises(RuntimeError):
1786 self.loop.call_soon(func)
1787 with self.assertRaises(RuntimeError):
1788 self.loop.call_soon_threadsafe(func)
1789 with self.assertRaises(RuntimeError):
1790 self.loop.call_later(1.0, func)
1791 with self.assertRaises(RuntimeError):
1792 self.loop.call_at(self.loop.time() + .0, func)
1793 with self.assertRaises(RuntimeError):
1794 self.loop.create_task(coro)
1795 with self.assertRaises(RuntimeError):
1796 self.loop.add_signal_handler(signal.SIGTERM, func)
1797
1798 # run_in_executor test is tricky: the method is a coroutine,
1799 # but run_until_complete cannot be called on closed loop.
1800 # Thus iterate once explicitly.
1801 with self.assertRaises(RuntimeError):
1802 it = self.loop.run_in_executor(None, func).__await__()
1803 next(it)
1804
1805
1806 class ESC[4;38;5;81mSubprocessTestsMixin:
1807
1808 def check_terminated(self, returncode):
1809 if sys.platform == 'win32':
1810 self.assertIsInstance(returncode, int)
1811 # expect 1 but sometimes get 0
1812 else:
1813 self.assertEqual(-signal.SIGTERM, returncode)
1814
1815 def check_killed(self, returncode):
1816 if sys.platform == 'win32':
1817 self.assertIsInstance(returncode, int)
1818 # expect 1 but sometimes get 0
1819 else:
1820 self.assertEqual(-signal.SIGKILL, returncode)
1821
1822 def test_subprocess_exec(self):
1823 prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1824
1825 connect = self.loop.subprocess_exec(
1826 functools.partial(MySubprocessProtocol, self.loop),
1827 sys.executable, prog)
1828
1829 transp, proto = self.loop.run_until_complete(connect)
1830 self.assertIsInstance(proto, MySubprocessProtocol)
1831 self.loop.run_until_complete(proto.connected)
1832 self.assertEqual('CONNECTED', proto.state)
1833
1834 stdin = transp.get_pipe_transport(0)
1835 stdin.write(b'Python The Winner')
1836 self.loop.run_until_complete(proto.got_data[1].wait())
1837 with test_utils.disable_logger():
1838 transp.close()
1839 self.loop.run_until_complete(proto.completed)
1840 self.check_killed(proto.returncode)
1841 self.assertEqual(b'Python The Winner', proto.data[1])
1842
1843 def test_subprocess_interactive(self):
1844 prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1845
1846 connect = self.loop.subprocess_exec(
1847 functools.partial(MySubprocessProtocol, self.loop),
1848 sys.executable, prog)
1849
1850 transp, proto = self.loop.run_until_complete(connect)
1851 self.assertIsInstance(proto, MySubprocessProtocol)
1852 self.loop.run_until_complete(proto.connected)
1853 self.assertEqual('CONNECTED', proto.state)
1854
1855 stdin = transp.get_pipe_transport(0)
1856 stdin.write(b'Python ')
1857 self.loop.run_until_complete(proto.got_data[1].wait())
1858 proto.got_data[1].clear()
1859 self.assertEqual(b'Python ', proto.data[1])
1860
1861 stdin.write(b'The Winner')
1862 self.loop.run_until_complete(proto.got_data[1].wait())
1863 self.assertEqual(b'Python The Winner', proto.data[1])
1864
1865 with test_utils.disable_logger():
1866 transp.close()
1867 self.loop.run_until_complete(proto.completed)
1868 self.check_killed(proto.returncode)
1869
1870 def test_subprocess_shell(self):
1871 connect = self.loop.subprocess_shell(
1872 functools.partial(MySubprocessProtocol, self.loop),
1873 'echo Python')
1874 transp, proto = self.loop.run_until_complete(connect)
1875 self.assertIsInstance(proto, MySubprocessProtocol)
1876 self.loop.run_until_complete(proto.connected)
1877
1878 transp.get_pipe_transport(0).close()
1879 self.loop.run_until_complete(proto.completed)
1880 self.assertEqual(0, proto.returncode)
1881 self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1882 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1883 self.assertEqual(proto.data[2], b'')
1884 transp.close()
1885
1886 def test_subprocess_exitcode(self):
1887 connect = self.loop.subprocess_shell(
1888 functools.partial(MySubprocessProtocol, self.loop),
1889 'exit 7', stdin=None, stdout=None, stderr=None)
1890
1891 transp, proto = self.loop.run_until_complete(connect)
1892 self.assertIsInstance(proto, MySubprocessProtocol)
1893 self.loop.run_until_complete(proto.completed)
1894 self.assertEqual(7, proto.returncode)
1895 transp.close()
1896
1897 def test_subprocess_close_after_finish(self):
1898 connect = self.loop.subprocess_shell(
1899 functools.partial(MySubprocessProtocol, self.loop),
1900 'exit 7', stdin=None, stdout=None, stderr=None)
1901
1902 transp, proto = self.loop.run_until_complete(connect)
1903 self.assertIsInstance(proto, MySubprocessProtocol)
1904 self.assertIsNone(transp.get_pipe_transport(0))
1905 self.assertIsNone(transp.get_pipe_transport(1))
1906 self.assertIsNone(transp.get_pipe_transport(2))
1907 self.loop.run_until_complete(proto.completed)
1908 self.assertEqual(7, proto.returncode)
1909 self.assertIsNone(transp.close())
1910
1911 def test_subprocess_kill(self):
1912 prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1913
1914 connect = self.loop.subprocess_exec(
1915 functools.partial(MySubprocessProtocol, self.loop),
1916 sys.executable, prog)
1917
1918 transp, proto = self.loop.run_until_complete(connect)
1919 self.assertIsInstance(proto, MySubprocessProtocol)
1920 self.loop.run_until_complete(proto.connected)
1921
1922 transp.kill()
1923 self.loop.run_until_complete(proto.completed)
1924 self.check_killed(proto.returncode)
1925 transp.close()
1926
1927 def test_subprocess_terminate(self):
1928 prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1929
1930 connect = self.loop.subprocess_exec(
1931 functools.partial(MySubprocessProtocol, self.loop),
1932 sys.executable, prog)
1933
1934 transp, proto = self.loop.run_until_complete(connect)
1935 self.assertIsInstance(proto, MySubprocessProtocol)
1936 self.loop.run_until_complete(proto.connected)
1937
1938 transp.terminate()
1939 self.loop.run_until_complete(proto.completed)
1940 self.check_terminated(proto.returncode)
1941 transp.close()
1942
1943 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1944 def test_subprocess_send_signal(self):
1945 # bpo-31034: Make sure that we get the default signal handler (killing
1946 # the process). The parent process may have decided to ignore SIGHUP,
1947 # and signal handlers are inherited.
1948 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1949 try:
1950 prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1951
1952 connect = self.loop.subprocess_exec(
1953 functools.partial(MySubprocessProtocol, self.loop),
1954 sys.executable, prog)
1955
1956
1957 transp, proto = self.loop.run_until_complete(connect)
1958 self.assertIsInstance(proto, MySubprocessProtocol)
1959 self.loop.run_until_complete(proto.connected)
1960
1961 transp.send_signal(signal.SIGHUP)
1962 self.loop.run_until_complete(proto.completed)
1963 self.assertEqual(-signal.SIGHUP, proto.returncode)
1964 transp.close()
1965 finally:
1966 signal.signal(signal.SIGHUP, old_handler)
1967
1968 def test_subprocess_stderr(self):
1969 prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1970
1971 connect = self.loop.subprocess_exec(
1972 functools.partial(MySubprocessProtocol, self.loop),
1973 sys.executable, prog)
1974
1975 transp, proto = self.loop.run_until_complete(connect)
1976 self.assertIsInstance(proto, MySubprocessProtocol)
1977 self.loop.run_until_complete(proto.connected)
1978
1979 stdin = transp.get_pipe_transport(0)
1980 stdin.write(b'test')
1981
1982 self.loop.run_until_complete(proto.completed)
1983
1984 transp.close()
1985 self.assertEqual(b'OUT:test', proto.data[1])
1986 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
1987 self.assertEqual(0, proto.returncode)
1988
1989 def test_subprocess_stderr_redirect_to_stdout(self):
1990 prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1991
1992 connect = self.loop.subprocess_exec(
1993 functools.partial(MySubprocessProtocol, self.loop),
1994 sys.executable, prog, stderr=subprocess.STDOUT)
1995
1996
1997 transp, proto = self.loop.run_until_complete(connect)
1998 self.assertIsInstance(proto, MySubprocessProtocol)
1999 self.loop.run_until_complete(proto.connected)
2000
2001 stdin = transp.get_pipe_transport(0)
2002 self.assertIsNotNone(transp.get_pipe_transport(1))
2003 self.assertIsNone(transp.get_pipe_transport(2))
2004
2005 stdin.write(b'test')
2006 self.loop.run_until_complete(proto.completed)
2007 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
2008 proto.data[1])
2009 self.assertEqual(b'', proto.data[2])
2010
2011 transp.close()
2012 self.assertEqual(0, proto.returncode)
2013
2014 def test_subprocess_close_client_stream(self):
2015 prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
2016
2017 connect = self.loop.subprocess_exec(
2018 functools.partial(MySubprocessProtocol, self.loop),
2019 sys.executable, prog)
2020
2021 transp, proto = self.loop.run_until_complete(connect)
2022 self.assertIsInstance(proto, MySubprocessProtocol)
2023 self.loop.run_until_complete(proto.connected)
2024
2025 stdin = transp.get_pipe_transport(0)
2026 stdout = transp.get_pipe_transport(1)
2027 stdin.write(b'test')
2028 self.loop.run_until_complete(proto.got_data[1].wait())
2029 self.assertEqual(b'OUT:test', proto.data[1])
2030
2031 stdout.close()
2032 self.loop.run_until_complete(proto.disconnects[1])
2033 stdin.write(b'xxx')
2034 self.loop.run_until_complete(proto.got_data[2].wait())
2035 if sys.platform != 'win32':
2036 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
2037 else:
2038 # After closing the read-end of a pipe, writing to the
2039 # write-end using os.write() fails with errno==EINVAL and
2040 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
2041 # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
2042 self.assertEqual(b'ERR:OSError', proto.data[2])
2043 with test_utils.disable_logger():
2044 transp.close()
2045 self.loop.run_until_complete(proto.completed)
2046 self.check_killed(proto.returncode)
2047
2048 def test_subprocess_wait_no_same_group(self):
2049 # start the new process in a new session
2050 connect = self.loop.subprocess_shell(
2051 functools.partial(MySubprocessProtocol, self.loop),
2052 'exit 7', stdin=None, stdout=None, stderr=None,
2053 start_new_session=True)
2054 transp, proto = self.loop.run_until_complete(connect)
2055 self.assertIsInstance(proto, MySubprocessProtocol)
2056 self.loop.run_until_complete(proto.completed)
2057 self.assertEqual(7, proto.returncode)
2058 transp.close()
2059
2060 def test_subprocess_exec_invalid_args(self):
2061 async def connect(**kwds):
2062 await self.loop.subprocess_exec(
2063 asyncio.SubprocessProtocol,
2064 'pwd', **kwds)
2065
2066 with self.assertRaises(ValueError):
2067 self.loop.run_until_complete(connect(universal_newlines=True))
2068 with self.assertRaises(ValueError):
2069 self.loop.run_until_complete(connect(bufsize=4096))
2070 with self.assertRaises(ValueError):
2071 self.loop.run_until_complete(connect(shell=True))
2072
2073 def test_subprocess_shell_invalid_args(self):
2074
2075 async def connect(cmd=None, **kwds):
2076 if not cmd:
2077 cmd = 'pwd'
2078 await self.loop.subprocess_shell(
2079 asyncio.SubprocessProtocol,
2080 cmd, **kwds)
2081
2082 with self.assertRaises(ValueError):
2083 self.loop.run_until_complete(connect(['ls', '-l']))
2084 with self.assertRaises(ValueError):
2085 self.loop.run_until_complete(connect(universal_newlines=True))
2086 with self.assertRaises(ValueError):
2087 self.loop.run_until_complete(connect(bufsize=4096))
2088 with self.assertRaises(ValueError):
2089 self.loop.run_until_complete(connect(shell=False))
2090
2091
2092 if sys.platform == 'win32':
2093
2094 class ESC[4;38;5;81mSelectEventLoopTests(ESC[4;38;5;149mEventLoopTestsMixin,
2095 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2096
2097 def create_event_loop(self):
2098 return asyncio.SelectorEventLoop()
2099
2100 class ESC[4;38;5;81mProactorEventLoopTests(ESC[4;38;5;149mEventLoopTestsMixin,
2101 ESC[4;38;5;149mSubprocessTestsMixin,
2102 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2103
2104 def create_event_loop(self):
2105 return asyncio.ProactorEventLoop()
2106
2107 def test_reader_callback(self):
2108 raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2109
2110 def test_reader_callback_cancel(self):
2111 raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2112
2113 def test_writer_callback(self):
2114 raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2115
2116 def test_writer_callback_cancel(self):
2117 raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2118
2119 def test_remove_fds_after_closing(self):
2120 raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2121 else:
2122 import selectors
2123
2124 class ESC[4;38;5;81mUnixEventLoopTestsMixin(ESC[4;38;5;149mEventLoopTestsMixin):
2125 def setUp(self):
2126 super().setUp()
2127 with warnings.catch_warnings():
2128 warnings.simplefilter('ignore', DeprecationWarning)
2129 watcher = asyncio.SafeChildWatcher()
2130 watcher.attach_loop(self.loop)
2131 asyncio.set_child_watcher(watcher)
2132
2133 def tearDown(self):
2134 with warnings.catch_warnings():
2135 warnings.simplefilter('ignore', DeprecationWarning)
2136 asyncio.set_child_watcher(None)
2137 super().tearDown()
2138
2139
2140 if hasattr(selectors, 'KqueueSelector'):
2141 class ESC[4;38;5;81mKqueueEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
2142 ESC[4;38;5;149mSubprocessTestsMixin,
2143 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2144
2145 def create_event_loop(self):
2146 return asyncio.SelectorEventLoop(
2147 selectors.KqueueSelector())
2148
2149 # kqueue doesn't support character devices (PTY) on Mac OS X older
2150 # than 10.9 (Maverick)
2151 @support.requires_mac_ver(10, 9)
2152 # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2153 # hangs on OpenBSD 5.5
2154 @unittest.skipIf(sys.platform.startswith('openbsd'),
2155 'test hangs on OpenBSD')
2156 def test_read_pty_output(self):
2157 super().test_read_pty_output()
2158
2159 # kqueue doesn't support character devices (PTY) on Mac OS X older
2160 # than 10.9 (Maverick)
2161 @support.requires_mac_ver(10, 9)
2162 def test_write_pty(self):
2163 super().test_write_pty()
2164
2165 if hasattr(selectors, 'EpollSelector'):
2166 class ESC[4;38;5;81mEPollEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
2167 ESC[4;38;5;149mSubprocessTestsMixin,
2168 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2169
2170 def create_event_loop(self):
2171 return asyncio.SelectorEventLoop(selectors.EpollSelector())
2172
2173 if hasattr(selectors, 'PollSelector'):
2174 class ESC[4;38;5;81mPollEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
2175 ESC[4;38;5;149mSubprocessTestsMixin,
2176 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2177
2178 def create_event_loop(self):
2179 return asyncio.SelectorEventLoop(selectors.PollSelector())
2180
2181 # Should always exist.
2182 class ESC[4;38;5;81mSelectEventLoopTests(ESC[4;38;5;149mUnixEventLoopTestsMixin,
2183 ESC[4;38;5;149mSubprocessTestsMixin,
2184 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2185
2186 def create_event_loop(self):
2187 return asyncio.SelectorEventLoop(selectors.SelectSelector())
2188
2189
2190 def noop(*args, **kwargs):
2191 pass
2192
2193
2194 class ESC[4;38;5;81mHandleTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2195
2196 def setUp(self):
2197 super().setUp()
2198 self.loop = mock.Mock()
2199 self.loop.get_debug.return_value = True
2200
2201 def test_handle(self):
2202 def callback(*args):
2203 return args
2204
2205 args = ()
2206 h = asyncio.Handle(callback, args, self.loop)
2207 self.assertIs(h._callback, callback)
2208 self.assertIs(h._args, args)
2209 self.assertFalse(h.cancelled())
2210
2211 h.cancel()
2212 self.assertTrue(h.cancelled())
2213
2214 def test_callback_with_exception(self):
2215 def callback():
2216 raise ValueError()
2217
2218 self.loop = mock.Mock()
2219 self.loop.call_exception_handler = mock.Mock()
2220
2221 h = asyncio.Handle(callback, (), self.loop)
2222 h._run()
2223
2224 self.loop.call_exception_handler.assert_called_with({
2225 'message': test_utils.MockPattern('Exception in callback.*'),
2226 'exception': mock.ANY,
2227 'handle': h,
2228 'source_traceback': h._source_traceback,
2229 })
2230
2231 def test_handle_weakref(self):
2232 wd = weakref.WeakValueDictionary()
2233 h = asyncio.Handle(lambda: None, (), self.loop)
2234 wd['h'] = h # Would fail without __weakref__ slot.
2235
2236 def test_handle_repr(self):
2237 self.loop.get_debug.return_value = False
2238
2239 # simple function
2240 h = asyncio.Handle(noop, (1, 2), self.loop)
2241 filename, lineno = test_utils.get_function_source(noop)
2242 self.assertEqual(repr(h),
2243 '<Handle noop(1, 2) at %s:%s>'
2244 % (filename, lineno))
2245
2246 # cancelled handle
2247 h.cancel()
2248 self.assertEqual(repr(h),
2249 '<Handle cancelled>')
2250
2251 # decorated function
2252 cb = types.coroutine(noop)
2253 h = asyncio.Handle(cb, (), self.loop)
2254 self.assertEqual(repr(h),
2255 '<Handle noop() at %s:%s>'
2256 % (filename, lineno))
2257
2258 # partial function
2259 cb = functools.partial(noop, 1, 2)
2260 h = asyncio.Handle(cb, (3,), self.loop)
2261 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2262 % (re.escape(filename), lineno))
2263 self.assertRegex(repr(h), regex)
2264
2265 # partial function with keyword args
2266 cb = functools.partial(noop, x=1)
2267 h = asyncio.Handle(cb, (2, 3), self.loop)
2268 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2269 % (re.escape(filename), lineno))
2270 self.assertRegex(repr(h), regex)
2271
2272 # partial method
2273 method = HandleTests.test_handle_repr
2274 cb = functools.partialmethod(method)
2275 filename, lineno = test_utils.get_function_source(method)
2276 h = asyncio.Handle(cb, (), self.loop)
2277
2278 cb_regex = r'<function HandleTests.test_handle_repr .*>'
2279 cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)'
2280 regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$'
2281 self.assertRegex(repr(h), regex)
2282
2283 def test_handle_repr_debug(self):
2284 self.loop.get_debug.return_value = True
2285
2286 # simple function
2287 create_filename = __file__
2288 create_lineno = sys._getframe().f_lineno + 1
2289 h = asyncio.Handle(noop, (1, 2), self.loop)
2290 filename, lineno = test_utils.get_function_source(noop)
2291 self.assertEqual(repr(h),
2292 '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2293 % (filename, lineno, create_filename, create_lineno))
2294
2295 # cancelled handle
2296 h.cancel()
2297 self.assertEqual(
2298 repr(h),
2299 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2300 % (filename, lineno, create_filename, create_lineno))
2301
2302 # double cancellation won't overwrite _repr
2303 h.cancel()
2304 self.assertEqual(
2305 repr(h),
2306 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2307 % (filename, lineno, create_filename, create_lineno))
2308
2309 def test_handle_source_traceback(self):
2310 loop = asyncio.get_event_loop_policy().new_event_loop()
2311 loop.set_debug(True)
2312 self.set_event_loop(loop)
2313
2314 def check_source_traceback(h):
2315 lineno = sys._getframe(1).f_lineno - 1
2316 self.assertIsInstance(h._source_traceback, list)
2317 self.assertEqual(h._source_traceback[-1][:3],
2318 (__file__,
2319 lineno,
2320 'test_handle_source_traceback'))
2321
2322 # call_soon
2323 h = loop.call_soon(noop)
2324 check_source_traceback(h)
2325
2326 # call_soon_threadsafe
2327 h = loop.call_soon_threadsafe(noop)
2328 check_source_traceback(h)
2329
2330 # call_later
2331 h = loop.call_later(0, noop)
2332 check_source_traceback(h)
2333
2334 # call_at
2335 h = loop.call_later(0, noop)
2336 check_source_traceback(h)
2337
2338 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2339 'No collections.abc.Coroutine')
2340 def test_coroutine_like_object_debug_formatting(self):
2341 # Test that asyncio can format coroutines that are instances of
2342 # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2343 # (such as ones compiled with Cython).
2344
2345 coro = CoroLike()
2346 coro.__name__ = 'AAA'
2347 self.assertTrue(asyncio.iscoroutine(coro))
2348 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2349
2350 coro.__qualname__ = 'BBB'
2351 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2352
2353 coro.cr_running = True
2354 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2355
2356 coro.__name__ = coro.__qualname__ = None
2357 self.assertEqual(coroutines._format_coroutine(coro),
2358 '<CoroLike without __name__>() running')
2359
2360 coro = CoroLike()
2361 coro.__qualname__ = 'CoroLike'
2362 # Some coroutines might not have '__name__', such as
2363 # built-in async_gen.asend().
2364 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2365
2366 coro = CoroLike()
2367 coro.__qualname__ = 'AAA'
2368 coro.cr_code = None
2369 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2370
2371
2372 class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2373
2374 def setUp(self):
2375 super().setUp()
2376 self.loop = mock.Mock()
2377
2378 def test_hash(self):
2379 when = time.monotonic()
2380 h = asyncio.TimerHandle(when, lambda: False, (),
2381 mock.Mock())
2382 self.assertEqual(hash(h), hash(when))
2383
2384 def test_when(self):
2385 when = time.monotonic()
2386 h = asyncio.TimerHandle(when, lambda: False, (),
2387 mock.Mock())
2388 self.assertEqual(when, h.when())
2389
2390 def test_timer(self):
2391 def callback(*args):
2392 return args
2393
2394 args = (1, 2, 3)
2395 when = time.monotonic()
2396 h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2397 self.assertIs(h._callback, callback)
2398 self.assertIs(h._args, args)
2399 self.assertFalse(h.cancelled())
2400
2401 # cancel
2402 h.cancel()
2403 self.assertTrue(h.cancelled())
2404 self.assertIsNone(h._callback)
2405 self.assertIsNone(h._args)
2406
2407
2408 def test_timer_repr(self):
2409 self.loop.get_debug.return_value = False
2410
2411 # simple function
2412 h = asyncio.TimerHandle(123, noop, (), self.loop)
2413 src = test_utils.get_function_source(noop)
2414 self.assertEqual(repr(h),
2415 '<TimerHandle when=123 noop() at %s:%s>' % src)
2416
2417 # cancelled handle
2418 h.cancel()
2419 self.assertEqual(repr(h),
2420 '<TimerHandle cancelled when=123>')
2421
2422 def test_timer_repr_debug(self):
2423 self.loop.get_debug.return_value = True
2424
2425 # simple function
2426 create_filename = __file__
2427 create_lineno = sys._getframe().f_lineno + 1
2428 h = asyncio.TimerHandle(123, noop, (), self.loop)
2429 filename, lineno = test_utils.get_function_source(noop)
2430 self.assertEqual(repr(h),
2431 '<TimerHandle when=123 noop() '
2432 'at %s:%s created at %s:%s>'
2433 % (filename, lineno, create_filename, create_lineno))
2434
2435 # cancelled handle
2436 h.cancel()
2437 self.assertEqual(repr(h),
2438 '<TimerHandle cancelled when=123 noop() '
2439 'at %s:%s created at %s:%s>'
2440 % (filename, lineno, create_filename, create_lineno))
2441
2442
2443 def test_timer_comparison(self):
2444 def callback(*args):
2445 return args
2446
2447 when = time.monotonic()
2448
2449 h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2450 h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2451 # TODO: Use assertLess etc.
2452 self.assertFalse(h1 < h2)
2453 self.assertFalse(h2 < h1)
2454 self.assertTrue(h1 <= h2)
2455 self.assertTrue(h2 <= h1)
2456 self.assertFalse(h1 > h2)
2457 self.assertFalse(h2 > h1)
2458 self.assertTrue(h1 >= h2)
2459 self.assertTrue(h2 >= h1)
2460 self.assertTrue(h1 == h2)
2461 self.assertFalse(h1 != h2)
2462
2463 h2.cancel()
2464 self.assertFalse(h1 == h2)
2465
2466 h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2467 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2468 self.assertTrue(h1 < h2)
2469 self.assertFalse(h2 < h1)
2470 self.assertTrue(h1 <= h2)
2471 self.assertFalse(h2 <= h1)
2472 self.assertFalse(h1 > h2)
2473 self.assertTrue(h2 > h1)
2474 self.assertFalse(h1 >= h2)
2475 self.assertTrue(h2 >= h1)
2476 self.assertFalse(h1 == h2)
2477 self.assertTrue(h1 != h2)
2478
2479 h3 = asyncio.Handle(callback, (), self.loop)
2480 self.assertIs(NotImplemented, h1.__eq__(h3))
2481 self.assertIs(NotImplemented, h1.__ne__(h3))
2482
2483 with self.assertRaises(TypeError):
2484 h1 < ()
2485 with self.assertRaises(TypeError):
2486 h1 > ()
2487 with self.assertRaises(TypeError):
2488 h1 <= ()
2489 with self.assertRaises(TypeError):
2490 h1 >= ()
2491 self.assertFalse(h1 == ())
2492 self.assertTrue(h1 != ())
2493
2494 self.assertTrue(h1 == ALWAYS_EQ)
2495 self.assertFalse(h1 != ALWAYS_EQ)
2496 self.assertTrue(h1 < LARGEST)
2497 self.assertFalse(h1 > LARGEST)
2498 self.assertTrue(h1 <= LARGEST)
2499 self.assertFalse(h1 >= LARGEST)
2500 self.assertFalse(h1 < SMALLEST)
2501 self.assertTrue(h1 > SMALLEST)
2502 self.assertFalse(h1 <= SMALLEST)
2503 self.assertTrue(h1 >= SMALLEST)
2504
2505
2506 class ESC[4;38;5;81mAbstractEventLoopTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2507
2508 def test_not_implemented(self):
2509 f = mock.Mock()
2510 loop = asyncio.AbstractEventLoop()
2511 self.assertRaises(
2512 NotImplementedError, loop.run_forever)
2513 self.assertRaises(
2514 NotImplementedError, loop.run_until_complete, None)
2515 self.assertRaises(
2516 NotImplementedError, loop.stop)
2517 self.assertRaises(
2518 NotImplementedError, loop.is_running)
2519 self.assertRaises(
2520 NotImplementedError, loop.is_closed)
2521 self.assertRaises(
2522 NotImplementedError, loop.close)
2523 self.assertRaises(
2524 NotImplementedError, loop.create_task, None)
2525 self.assertRaises(
2526 NotImplementedError, loop.call_later, None, None)
2527 self.assertRaises(
2528 NotImplementedError, loop.call_at, f, f)
2529 self.assertRaises(
2530 NotImplementedError, loop.call_soon, None)
2531 self.assertRaises(
2532 NotImplementedError, loop.time)
2533 self.assertRaises(
2534 NotImplementedError, loop.call_soon_threadsafe, None)
2535 self.assertRaises(
2536 NotImplementedError, loop.set_default_executor, f)
2537 self.assertRaises(
2538 NotImplementedError, loop.add_reader, 1, f)
2539 self.assertRaises(
2540 NotImplementedError, loop.remove_reader, 1)
2541 self.assertRaises(
2542 NotImplementedError, loop.add_writer, 1, f)
2543 self.assertRaises(
2544 NotImplementedError, loop.remove_writer, 1)
2545 self.assertRaises(
2546 NotImplementedError, loop.add_signal_handler, 1, f)
2547 self.assertRaises(
2548 NotImplementedError, loop.remove_signal_handler, 1)
2549 self.assertRaises(
2550 NotImplementedError, loop.remove_signal_handler, 1)
2551 self.assertRaises(
2552 NotImplementedError, loop.set_exception_handler, f)
2553 self.assertRaises(
2554 NotImplementedError, loop.default_exception_handler, f)
2555 self.assertRaises(
2556 NotImplementedError, loop.call_exception_handler, f)
2557 self.assertRaises(
2558 NotImplementedError, loop.get_debug)
2559 self.assertRaises(
2560 NotImplementedError, loop.set_debug, f)
2561
2562 def test_not_implemented_async(self):
2563
2564 async def inner():
2565 f = mock.Mock()
2566 loop = asyncio.AbstractEventLoop()
2567
2568 with self.assertRaises(NotImplementedError):
2569 await loop.run_in_executor(f, f)
2570 with self.assertRaises(NotImplementedError):
2571 await loop.getaddrinfo('localhost', 8080)
2572 with self.assertRaises(NotImplementedError):
2573 await loop.getnameinfo(('localhost', 8080))
2574 with self.assertRaises(NotImplementedError):
2575 await loop.create_connection(f)
2576 with self.assertRaises(NotImplementedError):
2577 await loop.create_server(f)
2578 with self.assertRaises(NotImplementedError):
2579 await loop.create_datagram_endpoint(f)
2580 with self.assertRaises(NotImplementedError):
2581 await loop.sock_recv(f, 10)
2582 with self.assertRaises(NotImplementedError):
2583 await loop.sock_recv_into(f, 10)
2584 with self.assertRaises(NotImplementedError):
2585 await loop.sock_sendall(f, 10)
2586 with self.assertRaises(NotImplementedError):
2587 await loop.sock_connect(f, f)
2588 with self.assertRaises(NotImplementedError):
2589 await loop.sock_accept(f)
2590 with self.assertRaises(NotImplementedError):
2591 await loop.sock_sendfile(f, f)
2592 with self.assertRaises(NotImplementedError):
2593 await loop.sendfile(f, f)
2594 with self.assertRaises(NotImplementedError):
2595 await loop.connect_read_pipe(f, mock.sentinel.pipe)
2596 with self.assertRaises(NotImplementedError):
2597 await loop.connect_write_pipe(f, mock.sentinel.pipe)
2598 with self.assertRaises(NotImplementedError):
2599 await loop.subprocess_shell(f, mock.sentinel)
2600 with self.assertRaises(NotImplementedError):
2601 await loop.subprocess_exec(f)
2602
2603 loop = asyncio.new_event_loop()
2604 loop.run_until_complete(inner())
2605 loop.close()
2606
2607
2608 class ESC[4;38;5;81mPolicyTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2609
2610 def test_event_loop_policy(self):
2611 policy = asyncio.AbstractEventLoopPolicy()
2612 self.assertRaises(NotImplementedError, policy.get_event_loop)
2613 self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2614 self.assertRaises(NotImplementedError, policy.new_event_loop)
2615 self.assertRaises(NotImplementedError, policy.get_child_watcher)
2616 self.assertRaises(NotImplementedError, policy.set_child_watcher,
2617 object())
2618
2619 def test_get_event_loop(self):
2620 policy = asyncio.DefaultEventLoopPolicy()
2621 self.assertIsNone(policy._local._loop)
2622 with self.assertWarns(DeprecationWarning) as cm:
2623 loop = policy.get_event_loop()
2624 self.assertEqual(cm.filename, __file__)
2625 self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2626
2627 self.assertIs(policy._local._loop, loop)
2628 self.assertIs(loop, policy.get_event_loop())
2629 loop.close()
2630
2631 def test_get_event_loop_calls_set_event_loop(self):
2632 policy = asyncio.DefaultEventLoopPolicy()
2633
2634 with mock.patch.object(
2635 policy, "set_event_loop",
2636 wraps=policy.set_event_loop) as m_set_event_loop:
2637
2638 with self.assertWarns(DeprecationWarning) as cm:
2639 loop = policy.get_event_loop()
2640 self.addCleanup(loop.close)
2641 self.assertEqual(cm.filename, __file__)
2642
2643 # policy._local._loop must be set through .set_event_loop()
2644 # (the unix DefaultEventLoopPolicy needs this call to attach
2645 # the child watcher correctly)
2646 m_set_event_loop.assert_called_with(loop)
2647
2648 loop.close()
2649
2650 def test_get_event_loop_after_set_none(self):
2651 policy = asyncio.DefaultEventLoopPolicy()
2652 policy.set_event_loop(None)
2653 self.assertRaises(RuntimeError, policy.get_event_loop)
2654
2655 @mock.patch('asyncio.events.threading.current_thread')
2656 def test_get_event_loop_thread(self, m_current_thread):
2657
2658 def f():
2659 policy = asyncio.DefaultEventLoopPolicy()
2660 self.assertRaises(RuntimeError, policy.get_event_loop)
2661
2662 th = threading.Thread(target=f)
2663 th.start()
2664 th.join()
2665
2666 def test_new_event_loop(self):
2667 policy = asyncio.DefaultEventLoopPolicy()
2668
2669 loop = policy.new_event_loop()
2670 self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2671 loop.close()
2672
2673 def test_set_event_loop(self):
2674 policy = asyncio.DefaultEventLoopPolicy()
2675 old_loop = policy.new_event_loop()
2676 policy.set_event_loop(old_loop)
2677
2678 self.assertRaises(TypeError, policy.set_event_loop, object())
2679
2680 loop = policy.new_event_loop()
2681 policy.set_event_loop(loop)
2682 self.assertIs(loop, policy.get_event_loop())
2683 self.assertIsNot(old_loop, policy.get_event_loop())
2684 loop.close()
2685 old_loop.close()
2686
2687 def test_get_event_loop_policy(self):
2688 policy = asyncio.get_event_loop_policy()
2689 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2690 self.assertIs(policy, asyncio.get_event_loop_policy())
2691
2692 def test_set_event_loop_policy(self):
2693 self.assertRaises(
2694 TypeError, asyncio.set_event_loop_policy, object())
2695
2696 old_policy = asyncio.get_event_loop_policy()
2697
2698 policy = asyncio.DefaultEventLoopPolicy()
2699 asyncio.set_event_loop_policy(policy)
2700 self.assertIs(policy, asyncio.get_event_loop_policy())
2701 self.assertIsNot(policy, old_policy)
2702
2703
2704 class ESC[4;38;5;81mGetEventLoopTestsMixin:
2705
2706 _get_running_loop_impl = None
2707 _set_running_loop_impl = None
2708 get_running_loop_impl = None
2709 get_event_loop_impl = None
2710
2711 def setUp(self):
2712 self._get_running_loop_saved = events._get_running_loop
2713 self._set_running_loop_saved = events._set_running_loop
2714 self.get_running_loop_saved = events.get_running_loop
2715 self.get_event_loop_saved = events.get_event_loop
2716
2717 events._get_running_loop = type(self)._get_running_loop_impl
2718 events._set_running_loop = type(self)._set_running_loop_impl
2719 events.get_running_loop = type(self).get_running_loop_impl
2720 events.get_event_loop = type(self).get_event_loop_impl
2721
2722 asyncio._get_running_loop = type(self)._get_running_loop_impl
2723 asyncio._set_running_loop = type(self)._set_running_loop_impl
2724 asyncio.get_running_loop = type(self).get_running_loop_impl
2725 asyncio.get_event_loop = type(self).get_event_loop_impl
2726
2727 super().setUp()
2728
2729 self.loop = asyncio.new_event_loop()
2730 asyncio.set_event_loop(self.loop)
2731
2732 if sys.platform != 'win32':
2733 with warnings.catch_warnings():
2734 warnings.simplefilter('ignore', DeprecationWarning)
2735 watcher = asyncio.SafeChildWatcher()
2736 watcher.attach_loop(self.loop)
2737 asyncio.set_child_watcher(watcher)
2738
2739 def tearDown(self):
2740 try:
2741 if sys.platform != 'win32':
2742 with warnings.catch_warnings():
2743 warnings.simplefilter('ignore', DeprecationWarning)
2744 asyncio.set_child_watcher(None)
2745
2746 super().tearDown()
2747 finally:
2748 self.loop.close()
2749 asyncio.set_event_loop(None)
2750
2751 events._get_running_loop = self._get_running_loop_saved
2752 events._set_running_loop = self._set_running_loop_saved
2753 events.get_running_loop = self.get_running_loop_saved
2754 events.get_event_loop = self.get_event_loop_saved
2755
2756 asyncio._get_running_loop = self._get_running_loop_saved
2757 asyncio._set_running_loop = self._set_running_loop_saved
2758 asyncio.get_running_loop = self.get_running_loop_saved
2759 asyncio.get_event_loop = self.get_event_loop_saved
2760
2761 if sys.platform != 'win32':
2762
2763 def test_get_event_loop_new_process(self):
2764 # bpo-32126: The multiprocessing module used by
2765 # ProcessPoolExecutor is not functional when the
2766 # multiprocessing.synchronize module cannot be imported.
2767 support.skip_if_broken_multiprocessing_synchronize()
2768
2769 self.addCleanup(multiprocessing_cleanup_tests)
2770
2771 async def main():
2772 if multiprocessing.get_start_method() == 'fork':
2773 # Avoid 'fork' DeprecationWarning.
2774 mp_context = multiprocessing.get_context('forkserver')
2775 else:
2776 mp_context = None
2777 pool = concurrent.futures.ProcessPoolExecutor(
2778 mp_context=mp_context)
2779 result = await self.loop.run_in_executor(
2780 pool, _test_get_event_loop_new_process__sub_proc)
2781 pool.shutdown()
2782 return result
2783
2784 self.assertEqual(
2785 self.loop.run_until_complete(main()),
2786 'hello')
2787
2788 def test_get_event_loop_returns_running_loop(self):
2789 class ESC[4;38;5;81mTestError(ESC[4;38;5;149mException):
2790 pass
2791
2792 class ESC[4;38;5;81mPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mDefaultEventLoopPolicy):
2793 def get_event_loop(self):
2794 raise TestError
2795
2796 old_policy = asyncio.get_event_loop_policy()
2797 try:
2798 asyncio.set_event_loop_policy(Policy())
2799 loop = asyncio.new_event_loop()
2800
2801 with self.assertRaises(TestError):
2802 asyncio.get_event_loop()
2803 asyncio.set_event_loop(None)
2804 with self.assertRaises(TestError):
2805 asyncio.get_event_loop()
2806
2807 with self.assertRaisesRegex(RuntimeError, 'no running'):
2808 asyncio.get_running_loop()
2809 self.assertIs(asyncio._get_running_loop(), None)
2810
2811 async def func():
2812 self.assertIs(asyncio.get_event_loop(), loop)
2813 self.assertIs(asyncio.get_running_loop(), loop)
2814 self.assertIs(asyncio._get_running_loop(), loop)
2815
2816 loop.run_until_complete(func())
2817
2818 asyncio.set_event_loop(loop)
2819 with self.assertRaises(TestError):
2820 asyncio.get_event_loop()
2821 asyncio.set_event_loop(None)
2822 with self.assertRaises(TestError):
2823 asyncio.get_event_loop()
2824
2825 finally:
2826 asyncio.set_event_loop_policy(old_policy)
2827 if loop is not None:
2828 loop.close()
2829
2830 with self.assertRaisesRegex(RuntimeError, 'no running'):
2831 asyncio.get_running_loop()
2832
2833 self.assertIs(asyncio._get_running_loop(), None)
2834
2835 def test_get_event_loop_returns_running_loop2(self):
2836 old_policy = asyncio.get_event_loop_policy()
2837 try:
2838 asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
2839 loop = asyncio.new_event_loop()
2840 self.addCleanup(loop.close)
2841
2842 with self.assertWarns(DeprecationWarning) as cm:
2843 loop2 = asyncio.get_event_loop()
2844 self.addCleanup(loop2.close)
2845 self.assertEqual(cm.filename, __file__)
2846 asyncio.set_event_loop(None)
2847 with self.assertRaisesRegex(RuntimeError, 'no current'):
2848 asyncio.get_event_loop()
2849
2850 with self.assertRaisesRegex(RuntimeError, 'no running'):
2851 asyncio.get_running_loop()
2852 self.assertIs(asyncio._get_running_loop(), None)
2853
2854 async def func():
2855 self.assertIs(asyncio.get_event_loop(), loop)
2856 self.assertIs(asyncio.get_running_loop(), loop)
2857 self.assertIs(asyncio._get_running_loop(), loop)
2858
2859 loop.run_until_complete(func())
2860
2861 asyncio.set_event_loop(loop)
2862 self.assertIs(asyncio.get_event_loop(), loop)
2863
2864 asyncio.set_event_loop(None)
2865 with self.assertRaisesRegex(RuntimeError, 'no current'):
2866 asyncio.get_event_loop()
2867
2868 finally:
2869 asyncio.set_event_loop_policy(old_policy)
2870 if loop is not None:
2871 loop.close()
2872
2873 with self.assertRaisesRegex(RuntimeError, 'no running'):
2874 asyncio.get_running_loop()
2875
2876 self.assertIs(asyncio._get_running_loop(), None)
2877
2878
2879 class ESC[4;38;5;81mTestPyGetEventLoop(ESC[4;38;5;149mGetEventLoopTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2880
2881 _get_running_loop_impl = events._py__get_running_loop
2882 _set_running_loop_impl = events._py__set_running_loop
2883 get_running_loop_impl = events._py_get_running_loop
2884 get_event_loop_impl = events._py_get_event_loop
2885
2886
2887 try:
2888 import _asyncio # NoQA
2889 except ImportError:
2890 pass
2891 else:
2892
2893 class ESC[4;38;5;81mTestCGetEventLoop(ESC[4;38;5;149mGetEventLoopTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2894
2895 _get_running_loop_impl = events._c__get_running_loop
2896 _set_running_loop_impl = events._c__set_running_loop
2897 get_running_loop_impl = events._c_get_running_loop
2898 get_event_loop_impl = events._c_get_event_loop
2899
2900
2901 class ESC[4;38;5;81mTestServer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2902
2903 def test_get_loop(self):
2904 loop = asyncio.new_event_loop()
2905 self.addCleanup(loop.close)
2906 proto = MyProto(loop)
2907 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
2908 self.assertEqual(server.get_loop(), loop)
2909 server.close()
2910 loop.run_until_complete(server.wait_closed())
2911
2912
2913 class ESC[4;38;5;81mTestAbstractServer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2914
2915 def test_close(self):
2916 with self.assertRaises(NotImplementedError):
2917 events.AbstractServer().close()
2918
2919 def test_wait_closed(self):
2920 loop = asyncio.new_event_loop()
2921 self.addCleanup(loop.close)
2922
2923 with self.assertRaises(NotImplementedError):
2924 loop.run_until_complete(events.AbstractServer().wait_closed())
2925
2926 def test_get_loop(self):
2927 with self.assertRaises(NotImplementedError):
2928 events.AbstractServer().get_loop()
2929
2930
2931 if __name__ == '__main__':
2932 unittest.main()