python (3.12.0)
1 """Tests support for new syntax introduced by PEP 492."""
2
3 import sys
4 import types
5 import unittest
6
7 from unittest import mock
8
9 import asyncio
10 from test.test_asyncio import utils as test_utils
11
12
13 def tearDownModule():
14 asyncio.set_event_loop_policy(None)
15
16
17 # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
18 class ESC[4;38;5;81mFakeCoro:
19 def send(self, value):
20 pass
21
22 def throw(self, typ, val=None, tb=None):
23 pass
24
25 def close(self):
26 pass
27
28 def __await__(self):
29 yield
30
31
32 class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
33
34 def setUp(self):
35 super().setUp()
36 self.loop = asyncio.BaseEventLoop()
37 self.loop._process_events = mock.Mock()
38 self.loop._selector = mock.Mock()
39 self.loop._selector.select.return_value = ()
40 self.set_event_loop(self.loop)
41
42
43 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseTest):
44
45 def test_context_manager_async_with(self):
46 primitives = [
47 asyncio.Lock(),
48 asyncio.Condition(),
49 asyncio.Semaphore(),
50 asyncio.BoundedSemaphore(),
51 ]
52
53 async def test(lock):
54 await asyncio.sleep(0.01)
55 self.assertFalse(lock.locked())
56 async with lock as _lock:
57 self.assertIs(_lock, None)
58 self.assertTrue(lock.locked())
59 await asyncio.sleep(0.01)
60 self.assertTrue(lock.locked())
61 self.assertFalse(lock.locked())
62
63 for primitive in primitives:
64 self.loop.run_until_complete(test(primitive))
65 self.assertFalse(primitive.locked())
66
67 def test_context_manager_with_await(self):
68 primitives = [
69 asyncio.Lock(),
70 asyncio.Condition(),
71 asyncio.Semaphore(),
72 asyncio.BoundedSemaphore(),
73 ]
74
75 async def test(lock):
76 await asyncio.sleep(0.01)
77 self.assertFalse(lock.locked())
78 with self.assertRaisesRegex(
79 TypeError,
80 "can't be used in 'await' expression"
81 ):
82 with await lock:
83 pass
84
85 for primitive in primitives:
86 self.loop.run_until_complete(test(primitive))
87 self.assertFalse(primitive.locked())
88
89
90 class ESC[4;38;5;81mStreamReaderTests(ESC[4;38;5;149mBaseTest):
91
92 def test_readline(self):
93 DATA = b'line1\nline2\nline3'
94
95 stream = asyncio.StreamReader(loop=self.loop)
96 stream.feed_data(DATA)
97 stream.feed_eof()
98
99 async def reader():
100 data = []
101 async for line in stream:
102 data.append(line)
103 return data
104
105 data = self.loop.run_until_complete(reader())
106 self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
107
108
109 class ESC[4;38;5;81mCoroutineTests(ESC[4;38;5;149mBaseTest):
110
111 def test_iscoroutine(self):
112 async def foo(): pass
113
114 f = foo()
115 try:
116 self.assertTrue(asyncio.iscoroutine(f))
117 finally:
118 f.close() # silence warning
119
120 self.assertTrue(asyncio.iscoroutine(FakeCoro()))
121
122 def test_iscoroutine_generator(self):
123 def foo(): yield
124
125 self.assertFalse(asyncio.iscoroutine(foo()))
126
127
128 def test_iscoroutinefunction(self):
129 async def foo(): pass
130 self.assertTrue(asyncio.iscoroutinefunction(foo))
131
132 def test_async_def_coroutines(self):
133 async def bar():
134 return 'spam'
135 async def foo():
136 return await bar()
137
138 # production mode
139 data = self.loop.run_until_complete(foo())
140 self.assertEqual(data, 'spam')
141
142 # debug mode
143 self.loop.set_debug(True)
144 data = self.loop.run_until_complete(foo())
145 self.assertEqual(data, 'spam')
146
147 def test_debug_mode_manages_coroutine_origin_tracking(self):
148 async def start():
149 self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0)
150
151 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
152 self.loop.set_debug(True)
153 self.loop.run_until_complete(start())
154 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
155
156 def test_types_coroutine(self):
157 def gen():
158 yield from ()
159 return 'spam'
160
161 @types.coroutine
162 def func():
163 return gen()
164
165 async def coro():
166 wrapper = func()
167 self.assertIsInstance(wrapper, types._GeneratorWrapper)
168 return await wrapper
169
170 data = self.loop.run_until_complete(coro())
171 self.assertEqual(data, 'spam')
172
173 def test_task_print_stack(self):
174 T = None
175
176 async def foo():
177 f = T.get_stack(limit=1)
178 try:
179 self.assertEqual(f[0].f_code.co_name, 'foo')
180 finally:
181 f = None
182
183 async def runner():
184 nonlocal T
185 T = asyncio.ensure_future(foo(), loop=self.loop)
186 await T
187
188 self.loop.run_until_complete(runner())
189
190 def test_double_await(self):
191 async def afunc():
192 await asyncio.sleep(0.1)
193
194 async def runner():
195 coro = afunc()
196 t = self.loop.create_task(coro)
197 try:
198 await asyncio.sleep(0)
199 await coro
200 finally:
201 t.cancel()
202
203 self.loop.set_debug(True)
204 with self.assertRaises(
205 RuntimeError,
206 msg='coroutine is being awaited already'):
207
208 self.loop.run_until_complete(runner())
209
210
211 if __name__ == '__main__':
212 unittest.main()