1 """Tests support for new syntax introduced by PEP 492."""
2
3 import sys
4 import types
5 import unittest
6
7 from unittest import mock
8
9 import asyncio
10 from test.test_asyncio import utils as test_utils
11
12
13 def tearDownModule():
14 asyncio.set_event_loop_policy(None)
15
16
17 # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
18 class ESC[4;38;5;81mFakeCoro:
19 def send(self, value):
20 pass
21
22 def throw(self, typ, val=None, tb=None):
23 pass
24
25 def close(self):
26 pass
27
28 def __await__(self):
29 yield
30
31
32 class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
33
34 def setUp(self):
35 super().setUp()
36 self.loop = asyncio.BaseEventLoop()
37 self.loop._process_events = mock.Mock()
38 self.loop._selector = mock.Mock()
39 self.loop._selector.select.return_value = ()
40 self.set_event_loop(self.loop)
41
42
43 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseTest):
44
45 def test_context_manager_async_with(self):
46 primitives = [
47 asyncio.Lock(),
48 asyncio.Condition(),
49 asyncio.Semaphore(),
50 asyncio.BoundedSemaphore(),
51 ]
52
53 async def test(lock):
54 await asyncio.sleep(0.01)
55 self.assertFalse(lock.locked())
56 async with lock as _lock:
57 self.assertIs(_lock, None)
58 self.assertTrue(lock.locked())
59 await asyncio.sleep(0.01)
60 self.assertTrue(lock.locked())
61 self.assertFalse(lock.locked())
62
63 for primitive in primitives:
64 self.loop.run_until_complete(test(primitive))
65 self.assertFalse(primitive.locked())
66
67 def test_context_manager_with_await(self):
68 primitives = [
69 asyncio.Lock(),
70 asyncio.Condition(),
71 asyncio.Semaphore(),
72 asyncio.BoundedSemaphore(),
73 ]
74
75 async def test(lock):
76 await asyncio.sleep(0.01)
77 self.assertFalse(lock.locked())
78 with self.assertRaisesRegex(
79 TypeError,
80 "can't be used in 'await' expression"
81 ):
82 with await lock:
83 pass
84
85 for primitive in primitives:
86 self.loop.run_until_complete(test(primitive))
87 self.assertFalse(primitive.locked())
88
89
90 class ESC[4;38;5;81mStreamReaderTests(ESC[4;38;5;149mBaseTest):
91
92 def test_readline(self):
93 DATA = b'line1\nline2\nline3'
94
95 stream = asyncio.StreamReader(loop=self.loop)
96 stream.feed_data(DATA)
97 stream.feed_eof()
98
99 async def reader():
100 data = []
101 async for line in stream:
102 data.append(line)
103 return data
104
105 data = self.loop.run_until_complete(reader())
106 self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
107
108
109 class ESC[4;38;5;81mCoroutineTests(ESC[4;38;5;149mBaseTest):
110
111 def test_iscoroutine(self):
112 async def foo(): pass
113
114 f = foo()
115 try:
116 self.assertTrue(asyncio.iscoroutine(f))
117 finally:
118 f.close() # silence warning
119
120 self.assertTrue(asyncio.iscoroutine(FakeCoro()))
121
122 def test_iscoroutinefunction(self):
123 async def foo(): pass
124 self.assertTrue(asyncio.iscoroutinefunction(foo))
125
126 def test_async_def_coroutines(self):
127 async def bar():
128 return 'spam'
129 async def foo():
130 return await bar()
131
132 # production mode
133 data = self.loop.run_until_complete(foo())
134 self.assertEqual(data, 'spam')
135
136 # debug mode
137 self.loop.set_debug(True)
138 data = self.loop.run_until_complete(foo())
139 self.assertEqual(data, 'spam')
140
141 def test_debug_mode_manages_coroutine_origin_tracking(self):
142 async def start():
143 self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0)
144
145 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
146 self.loop.set_debug(True)
147 self.loop.run_until_complete(start())
148 self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
149
150 def test_types_coroutine(self):
151 def gen():
152 yield from ()
153 return 'spam'
154
155 @types.coroutine
156 def func():
157 return gen()
158
159 async def coro():
160 wrapper = func()
161 self.assertIsInstance(wrapper, types._GeneratorWrapper)
162 return await wrapper
163
164 data = self.loop.run_until_complete(coro())
165 self.assertEqual(data, 'spam')
166
167 def test_task_print_stack(self):
168 T = None
169
170 async def foo():
171 f = T.get_stack(limit=1)
172 try:
173 self.assertEqual(f[0].f_code.co_name, 'foo')
174 finally:
175 f = None
176
177 async def runner():
178 nonlocal T
179 T = asyncio.ensure_future(foo(), loop=self.loop)
180 await T
181
182 self.loop.run_until_complete(runner())
183
184 def test_double_await(self):
185 async def afunc():
186 await asyncio.sleep(0.1)
187
188 async def runner():
189 coro = afunc()
190 t = self.loop.create_task(coro)
191 try:
192 await asyncio.sleep(0)
193 await coro
194 finally:
195 t.cancel()
196
197 self.loop.set_debug(True)
198 with self.assertRaises(
199 RuntimeError,
200 msg='coroutine is being awaited already'):
201
202 self.loop.run_until_complete(runner())
203
204
205 if __name__ == '__main__':
206 unittest.main()