1 # IsolatedAsyncioTestCase based tests
2 import asyncio
3 import traceback
4 import unittest
5 from asyncio import tasks
6
7
8 def tearDownModule():
9 asyncio.set_event_loop_policy(None)
10
11
12 class ESC[4;38;5;81mFutureTests:
13
14 async def test_future_traceback(self):
15
16 async def raise_exc():
17 raise TypeError(42)
18
19 future = self.cls(raise_exc())
20
21 for _ in range(5):
22 try:
23 await future
24 except TypeError as e:
25 tb = ''.join(traceback.format_tb(e.__traceback__))
26 self.assertEqual(tb.count("await future"), 1)
27 else:
28 self.fail('TypeError was not raised')
29
30 @unittest.skipUnless(hasattr(tasks, '_CTask'),
31 'requires the C _asyncio module')
32 class ESC[4;38;5;81mCFutureTests(ESC[4;38;5;149mFutureTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
33 cls = tasks._CTask
34
35 class ESC[4;38;5;81mPyFutureTests(ESC[4;38;5;149mFutureTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
36 cls = tasks._PyTask
37
38 class ESC[4;38;5;81mFutureReprTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
39
40 async def test_recursive_repr_for_pending_tasks(self):
41 # The call crashes if the guard for recursive call
42 # in base_futures:_future_repr_info is absent
43 # See Also: https://bugs.python.org/issue42183
44
45 async def func():
46 return asyncio.all_tasks()
47
48 # The repr() call should not raise RecursiveError at first.
49 # The check for returned string is not very reliable but
50 # exact comparison for the whole string is even weaker.
51 self.assertIn('...', repr(await asyncio.wait_for(func(), timeout=10)))
52
53
54 if __name__ == '__main__':
55 unittest.main()