1 import asyncio
2 import time
3 import threading
4 import unittest
5
6 from test.support import socket_helper
7 from test.test_asyncio import utils as test_utils
8 from test.test_asyncio import functional as func_tests
9
10
11 def tearDownModule():
12 asyncio.set_event_loop_policy(None)
13
14
15 class ESC[4;38;5;81mBaseStartServer(ESC[4;38;5;149mfunc_testsESC[4;38;5;149m.ESC[4;38;5;149mFunctionalTestCaseMixin):
16
17 def new_loop(self):
18 raise NotImplementedError
19
20 def test_start_server_1(self):
21 HELLO_MSG = b'1' * 1024 * 5 + b'\n'
22
23 def client(sock, addr):
24 for i in range(10):
25 time.sleep(0.2)
26 if srv.is_serving():
27 break
28 else:
29 raise RuntimeError
30
31 sock.settimeout(2)
32 sock.connect(addr)
33 sock.send(HELLO_MSG)
34 sock.recv_all(1)
35 sock.close()
36
37 async def serve(reader, writer):
38 await reader.readline()
39 main_task.cancel()
40 writer.write(b'1')
41 writer.close()
42 await writer.wait_closed()
43
44 async def main(srv):
45 async with srv:
46 await srv.serve_forever()
47
48 srv = self.loop.run_until_complete(asyncio.start_server(
49 serve, socket_helper.HOSTv4, 0, start_serving=False))
50
51 self.assertFalse(srv.is_serving())
52
53 main_task = self.loop.create_task(main(srv))
54
55 addr = srv.sockets[0].getsockname()
56 with self.assertRaises(asyncio.CancelledError):
57 with self.tcp_client(lambda sock: client(sock, addr)):
58 self.loop.run_until_complete(main_task)
59
60 self.assertEqual(srv.sockets, ())
61
62 self.assertIsNone(srv._sockets)
63 self.assertIsNone(srv._waiters)
64 self.assertFalse(srv.is_serving())
65
66 with self.assertRaisesRegex(RuntimeError, r'is closed'):
67 self.loop.run_until_complete(srv.serve_forever())
68
69
70 class ESC[4;38;5;81mSelectorStartServerTests(ESC[4;38;5;149mBaseStartServer, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
71
72 def new_loop(self):
73 return asyncio.SelectorEventLoop()
74
75 @socket_helper.skip_unless_bind_unix_socket
76 def test_start_unix_server_1(self):
77 HELLO_MSG = b'1' * 1024 * 5 + b'\n'
78 started = threading.Event()
79
80 def client(sock, addr):
81 sock.settimeout(2)
82 started.wait(5)
83 sock.connect(addr)
84 sock.send(HELLO_MSG)
85 sock.recv_all(1)
86 sock.close()
87
88 async def serve(reader, writer):
89 await reader.readline()
90 main_task.cancel()
91 writer.write(b'1')
92 writer.close()
93 await writer.wait_closed()
94
95 async def main(srv):
96 async with srv:
97 self.assertFalse(srv.is_serving())
98 await srv.start_serving()
99 self.assertTrue(srv.is_serving())
100 started.set()
101 await srv.serve_forever()
102
103 with test_utils.unix_socket_path() as addr:
104 srv = self.loop.run_until_complete(asyncio.start_unix_server(
105 serve, addr, start_serving=False))
106
107 main_task = self.loop.create_task(main(srv))
108
109 with self.assertRaises(asyncio.CancelledError):
110 with self.unix_client(lambda sock: client(sock, addr)):
111 self.loop.run_until_complete(main_task)
112
113 self.assertEqual(srv.sockets, ())
114
115 self.assertIsNone(srv._sockets)
116 self.assertIsNone(srv._waiters)
117 self.assertFalse(srv.is_serving())
118
119 with self.assertRaisesRegex(RuntimeError, r'is closed'):
120 self.loop.run_until_complete(srv.serve_forever())
121
122
123 @unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
124 class ESC[4;38;5;81mProactorStartServerTests(ESC[4;38;5;149mBaseStartServer, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
125
126 def new_loop(self):
127 return asyncio.ProactorEventLoop()
128
129
130 if __name__ == '__main__':
131 unittest.main()