python (3.11.7)
1 import asyncio
2 import contextvars
3 import unittest
4 from test import support
5
6 support.requires_working_socket(module=True)
7
8
9 class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
10 pass
11
12
13 def tearDownModule():
14 asyncio.set_event_loop_policy(None)
15
16
17 class ESC[4;38;5;81mTestCM:
18 def __init__(self, ordering, enter_result=None):
19 self.ordering = ordering
20 self.enter_result = enter_result
21
22 async def __aenter__(self):
23 self.ordering.append('enter')
24 return self.enter_result
25
26 async def __aexit__(self, *exc_info):
27 self.ordering.append('exit')
28
29
30 class ESC[4;38;5;81mLacksEnterAndExit:
31 pass
32 class ESC[4;38;5;81mLacksEnter:
33 async def __aexit__(self, *exc_info):
34 pass
35 class ESC[4;38;5;81mLacksExit:
36 async def __aenter__(self):
37 pass
38
39
40 VAR = contextvars.ContextVar('VAR', default=())
41
42
43 class ESC[4;38;5;81mTestAsyncCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
44 maxDiff = None
45
46 def setUp(self):
47 # Ensure that IsolatedAsyncioTestCase instances are destroyed before
48 # starting a new event loop
49 self.addCleanup(support.gc_collect)
50
51 def test_full_cycle(self):
52 expected = ['setUp',
53 'asyncSetUp',
54 'test',
55 'asyncTearDown',
56 'tearDown',
57 'cleanup6',
58 'cleanup5',
59 'cleanup4',
60 'cleanup3',
61 'cleanup2',
62 'cleanup1']
63 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
64 def setUp(self):
65 self.assertEqual(events, [])
66 events.append('setUp')
67 VAR.set(VAR.get() + ('setUp',))
68 self.addCleanup(self.on_cleanup1)
69 self.addAsyncCleanup(self.on_cleanup2)
70
71 async def asyncSetUp(self):
72 self.assertEqual(events, expected[:1])
73 events.append('asyncSetUp')
74 VAR.set(VAR.get() + ('asyncSetUp',))
75 self.addCleanup(self.on_cleanup3)
76 self.addAsyncCleanup(self.on_cleanup4)
77
78 async def test_func(self):
79 self.assertEqual(events, expected[:2])
80 events.append('test')
81 VAR.set(VAR.get() + ('test',))
82 self.addCleanup(self.on_cleanup5)
83 self.addAsyncCleanup(self.on_cleanup6)
84
85 async def asyncTearDown(self):
86 self.assertEqual(events, expected[:3])
87 VAR.set(VAR.get() + ('asyncTearDown',))
88 events.append('asyncTearDown')
89
90 def tearDown(self):
91 self.assertEqual(events, expected[:4])
92 events.append('tearDown')
93 VAR.set(VAR.get() + ('tearDown',))
94
95 def on_cleanup1(self):
96 self.assertEqual(events, expected[:10])
97 events.append('cleanup1')
98 VAR.set(VAR.get() + ('cleanup1',))
99 nonlocal cvar
100 cvar = VAR.get()
101
102 async def on_cleanup2(self):
103 self.assertEqual(events, expected[:9])
104 events.append('cleanup2')
105 VAR.set(VAR.get() + ('cleanup2',))
106
107 def on_cleanup3(self):
108 self.assertEqual(events, expected[:8])
109 events.append('cleanup3')
110 VAR.set(VAR.get() + ('cleanup3',))
111
112 async def on_cleanup4(self):
113 self.assertEqual(events, expected[:7])
114 events.append('cleanup4')
115 VAR.set(VAR.get() + ('cleanup4',))
116
117 def on_cleanup5(self):
118 self.assertEqual(events, expected[:6])
119 events.append('cleanup5')
120 VAR.set(VAR.get() + ('cleanup5',))
121
122 async def on_cleanup6(self):
123 self.assertEqual(events, expected[:5])
124 events.append('cleanup6')
125 VAR.set(VAR.get() + ('cleanup6',))
126
127 events = []
128 cvar = ()
129 test = Test("test_func")
130 result = test.run()
131 self.assertEqual(result.errors, [])
132 self.assertEqual(result.failures, [])
133 self.assertEqual(events, expected)
134 self.assertEqual(cvar, tuple(expected))
135
136 events = []
137 cvar = ()
138 test = Test("test_func")
139 test.debug()
140 self.assertEqual(events, expected)
141 self.assertEqual(cvar, tuple(expected))
142 test.doCleanups()
143 self.assertEqual(events, expected)
144 self.assertEqual(cvar, tuple(expected))
145
146 def test_exception_in_setup(self):
147 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
148 async def asyncSetUp(self):
149 events.append('asyncSetUp')
150 self.addAsyncCleanup(self.on_cleanup)
151 raise MyException()
152
153 async def test_func(self):
154 events.append('test')
155
156 async def asyncTearDown(self):
157 events.append('asyncTearDown')
158
159 async def on_cleanup(self):
160 events.append('cleanup')
161
162
163 events = []
164 test = Test("test_func")
165 result = test.run()
166 self.assertEqual(events, ['asyncSetUp', 'cleanup'])
167 self.assertIs(result.errors[0][0], test)
168 self.assertIn('MyException', result.errors[0][1])
169
170 events = []
171 test = Test("test_func")
172 self.addCleanup(test._tearDownAsyncioRunner)
173 try:
174 test.debug()
175 except MyException:
176 pass
177 else:
178 self.fail('Expected a MyException exception')
179 self.assertEqual(events, ['asyncSetUp'])
180 test.doCleanups()
181 self.assertEqual(events, ['asyncSetUp', 'cleanup'])
182
183 def test_exception_in_test(self):
184 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
185 async def asyncSetUp(self):
186 events.append('asyncSetUp')
187
188 async def test_func(self):
189 events.append('test')
190 self.addAsyncCleanup(self.on_cleanup)
191 raise MyException()
192
193 async def asyncTearDown(self):
194 events.append('asyncTearDown')
195
196 async def on_cleanup(self):
197 events.append('cleanup')
198
199 events = []
200 test = Test("test_func")
201 result = test.run()
202 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
203 self.assertIs(result.errors[0][0], test)
204 self.assertIn('MyException', result.errors[0][1])
205
206 events = []
207 test = Test("test_func")
208 self.addCleanup(test._tearDownAsyncioRunner)
209 try:
210 test.debug()
211 except MyException:
212 pass
213 else:
214 self.fail('Expected a MyException exception')
215 self.assertEqual(events, ['asyncSetUp', 'test'])
216 test.doCleanups()
217 self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
218
219 def test_exception_in_tear_down(self):
220 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
221 async def asyncSetUp(self):
222 events.append('asyncSetUp')
223
224 async def test_func(self):
225 events.append('test')
226 self.addAsyncCleanup(self.on_cleanup)
227
228 async def asyncTearDown(self):
229 events.append('asyncTearDown')
230 raise MyException()
231
232 async def on_cleanup(self):
233 events.append('cleanup')
234
235 events = []
236 test = Test("test_func")
237 result = test.run()
238 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
239 self.assertIs(result.errors[0][0], test)
240 self.assertIn('MyException', result.errors[0][1])
241
242 events = []
243 test = Test("test_func")
244 self.addCleanup(test._tearDownAsyncioRunner)
245 try:
246 test.debug()
247 except MyException:
248 pass
249 else:
250 self.fail('Expected a MyException exception')
251 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
252 test.doCleanups()
253 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
254
255 def test_exception_in_tear_clean_up(self):
256 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
257 async def asyncSetUp(self):
258 events.append('asyncSetUp')
259
260 async def test_func(self):
261 events.append('test')
262 self.addAsyncCleanup(self.on_cleanup1)
263 self.addAsyncCleanup(self.on_cleanup2)
264
265 async def asyncTearDown(self):
266 events.append('asyncTearDown')
267
268 async def on_cleanup1(self):
269 events.append('cleanup1')
270 raise MyException('some error')
271
272 async def on_cleanup2(self):
273 events.append('cleanup2')
274 raise MyException('other error')
275
276 events = []
277 test = Test("test_func")
278 result = test.run()
279 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
280 self.assertIs(result.errors[0][0], test)
281 self.assertIn('MyException: other error', result.errors[0][1])
282 self.assertIn('MyException: some error', result.errors[1][1])
283
284 events = []
285 test = Test("test_func")
286 self.addCleanup(test._tearDownAsyncioRunner)
287 try:
288 test.debug()
289 except MyException:
290 pass
291 else:
292 self.fail('Expected a MyException exception')
293 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
294 test.doCleanups()
295 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
296
297 def test_deprecation_of_return_val_from_test(self):
298 # Issue 41322 - deprecate return of value that is not None from a test
299 class ESC[4;38;5;81mNothing:
300 def __eq__(self, o):
301 return o is None
302 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
303 async def test1(self):
304 return 1
305 async def test2(self):
306 yield 1
307 async def test3(self):
308 return Nothing()
309
310 with self.assertWarns(DeprecationWarning) as w:
311 Test('test1').run()
312 self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
313 self.assertIn('test1', str(w.warning))
314 self.assertEqual(w.filename, __file__)
315
316 with self.assertWarns(DeprecationWarning) as w:
317 Test('test2').run()
318 self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
319 self.assertIn('test2', str(w.warning))
320 self.assertEqual(w.filename, __file__)
321
322 with self.assertWarns(DeprecationWarning) as w:
323 Test('test3').run()
324 self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
325 self.assertIn('test3', str(w.warning))
326 self.assertEqual(w.filename, __file__)
327
328 def test_cleanups_interleave_order(self):
329 events = []
330
331 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
332 async def test_func(self):
333 self.addAsyncCleanup(self.on_sync_cleanup, 1)
334 self.addAsyncCleanup(self.on_async_cleanup, 2)
335 self.addAsyncCleanup(self.on_sync_cleanup, 3)
336 self.addAsyncCleanup(self.on_async_cleanup, 4)
337
338 async def on_sync_cleanup(self, val):
339 events.append(f'sync_cleanup {val}')
340
341 async def on_async_cleanup(self, val):
342 events.append(f'async_cleanup {val}')
343
344 test = Test("test_func")
345 test.run()
346 self.assertEqual(events, ['async_cleanup 4',
347 'sync_cleanup 3',
348 'async_cleanup 2',
349 'sync_cleanup 1'])
350
351 def test_base_exception_from_async_method(self):
352 events = []
353 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
354 async def test_base(self):
355 events.append("test_base")
356 raise BaseException()
357 events.append("not it")
358
359 async def test_no_err(self):
360 events.append("test_no_err")
361
362 async def test_cancel(self):
363 raise asyncio.CancelledError()
364
365 test = Test("test_base")
366 output = test.run()
367 self.assertFalse(output.wasSuccessful())
368
369 test = Test("test_no_err")
370 test.run()
371 self.assertEqual(events, ['test_base', 'test_no_err'])
372
373 test = Test("test_cancel")
374 output = test.run()
375 self.assertFalse(output.wasSuccessful())
376
377 def test_cancellation_hanging_tasks(self):
378 cancelled = False
379 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
380 async def test_leaking_task(self):
381 async def coro():
382 nonlocal cancelled
383 try:
384 await asyncio.sleep(1)
385 except asyncio.CancelledError:
386 cancelled = True
387 raise
388
389 # Leave this running in the background
390 asyncio.create_task(coro())
391
392 test = Test("test_leaking_task")
393 output = test.run()
394 self.assertTrue(cancelled)
395
396 def test_enterAsyncContext(self):
397 events = []
398
399 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
400 async def test_func(slf):
401 slf.addAsyncCleanup(events.append, 'cleanup1')
402 cm = TestCM(events, 42)
403 self.assertEqual(await slf.enterAsyncContext(cm), 42)
404 slf.addAsyncCleanup(events.append, 'cleanup2')
405 events.append('test')
406
407 test = Test('test_func')
408 output = test.run()
409 self.assertTrue(output.wasSuccessful(), output)
410 self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1'])
411
412 def test_enterAsyncContext_arg_errors(self):
413 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
414 async def test_func(slf):
415 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
416 await slf.enterAsyncContext(LacksEnterAndExit())
417 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
418 await slf.enterAsyncContext(LacksEnter())
419 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
420 await slf.enterAsyncContext(LacksExit())
421
422 test = Test('test_func')
423 output = test.run()
424 self.assertTrue(output.wasSuccessful())
425
426 def test_debug_cleanup_same_loop(self):
427 class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
428 async def asyncSetUp(self):
429 async def coro():
430 await asyncio.sleep(0)
431 fut = asyncio.ensure_future(coro())
432 self.addAsyncCleanup(self.cleanup, fut)
433 events.append('asyncSetUp')
434
435 async def test_func(self):
436 events.append('test')
437 raise MyException()
438
439 async def asyncTearDown(self):
440 events.append('asyncTearDown')
441
442 async def cleanup(self, fut):
443 try:
444 # Raises an exception if in different loop
445 await asyncio.wait([fut])
446 events.append('cleanup')
447 except:
448 import traceback
449 traceback.print_exc()
450 raise
451
452 events = []
453 test = Test("test_func")
454 result = test.run()
455 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
456 self.assertIn('MyException', result.errors[0][1])
457
458 events = []
459 test = Test("test_func")
460 self.addCleanup(test._tearDownAsyncioRunner)
461 try:
462 test.debug()
463 except MyException:
464 pass
465 else:
466 self.fail('Expected a MyException exception')
467 self.assertEqual(events, ['asyncSetUp', 'test'])
468 test.doCleanups()
469 self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
470
471 def test_setup_get_event_loop(self):
472 # See https://github.com/python/cpython/issues/95736
473 # Make sure the default event loop is not used
474 asyncio.set_event_loop(None)
475
476 class ESC[4;38;5;81mTestCase1(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
477 def setUp(self):
478 asyncio.get_event_loop_policy().get_event_loop()
479
480 async def test_demo1(self):
481 pass
482
483 test = TestCase1('test_demo1')
484 result = test.run()
485 self.assertTrue(result.wasSuccessful())
486
487 if __name__ == "__main__":
488 unittest.main()