(root)/
Python-3.12.0/
Lib/
test/
test_asyncio/
test_pep492.py
       1  """Tests support for new syntax introduced by PEP 492."""
       2  
       3  import sys
       4  import types
       5  import unittest
       6  
       7  from unittest import mock
       8  
       9  import asyncio
      10  from test.test_asyncio import utils as test_utils
      11  
      12  
      13  def tearDownModule():
      14      asyncio.set_event_loop_policy(None)
      15  
      16  
      17  # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
      18  class ESC[4;38;5;81mFakeCoro:
      19      def send(self, value):
      20          pass
      21  
      22      def throw(self, typ, val=None, tb=None):
      23          pass
      24  
      25      def close(self):
      26          pass
      27  
      28      def __await__(self):
      29          yield
      30  
      31  
      32  class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      33  
      34      def setUp(self):
      35          super().setUp()
      36          self.loop = asyncio.BaseEventLoop()
      37          self.loop._process_events = mock.Mock()
      38          self.loop._selector = mock.Mock()
      39          self.loop._selector.select.return_value = ()
      40          self.set_event_loop(self.loop)
      41  
      42  
      43  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseTest):
      44  
      45      def test_context_manager_async_with(self):
      46          primitives = [
      47              asyncio.Lock(),
      48              asyncio.Condition(),
      49              asyncio.Semaphore(),
      50              asyncio.BoundedSemaphore(),
      51          ]
      52  
      53          async def test(lock):
      54              await asyncio.sleep(0.01)
      55              self.assertFalse(lock.locked())
      56              async with lock as _lock:
      57                  self.assertIs(_lock, None)
      58                  self.assertTrue(lock.locked())
      59                  await asyncio.sleep(0.01)
      60                  self.assertTrue(lock.locked())
      61              self.assertFalse(lock.locked())
      62  
      63          for primitive in primitives:
      64              self.loop.run_until_complete(test(primitive))
      65              self.assertFalse(primitive.locked())
      66  
      67      def test_context_manager_with_await(self):
      68          primitives = [
      69              asyncio.Lock(),
      70              asyncio.Condition(),
      71              asyncio.Semaphore(),
      72              asyncio.BoundedSemaphore(),
      73          ]
      74  
      75          async def test(lock):
      76              await asyncio.sleep(0.01)
      77              self.assertFalse(lock.locked())
      78              with self.assertRaisesRegex(
      79                  TypeError,
      80                  "can't be used in 'await' expression"
      81              ):
      82                  with await lock:
      83                      pass
      84  
      85          for primitive in primitives:
      86              self.loop.run_until_complete(test(primitive))
      87              self.assertFalse(primitive.locked())
      88  
      89  
      90  class ESC[4;38;5;81mStreamReaderTests(ESC[4;38;5;149mBaseTest):
      91  
      92      def test_readline(self):
      93          DATA = b'line1\nline2\nline3'
      94  
      95          stream = asyncio.StreamReader(loop=self.loop)
      96          stream.feed_data(DATA)
      97          stream.feed_eof()
      98  
      99          async def reader():
     100              data = []
     101              async for line in stream:
     102                  data.append(line)
     103              return data
     104  
     105          data = self.loop.run_until_complete(reader())
     106          self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
     107  
     108  
     109  class ESC[4;38;5;81mCoroutineTests(ESC[4;38;5;149mBaseTest):
     110  
     111      def test_iscoroutine(self):
     112          async def foo(): pass
     113  
     114          f = foo()
     115          try:
     116              self.assertTrue(asyncio.iscoroutine(f))
     117          finally:
     118              f.close() # silence warning
     119  
     120          self.assertTrue(asyncio.iscoroutine(FakeCoro()))
     121  
     122      def test_iscoroutine_generator(self):
     123          def foo(): yield
     124  
     125          self.assertFalse(asyncio.iscoroutine(foo()))
     126  
     127  
     128      def test_iscoroutinefunction(self):
     129          async def foo(): pass
     130          self.assertTrue(asyncio.iscoroutinefunction(foo))
     131  
     132      def test_async_def_coroutines(self):
     133          async def bar():
     134              return 'spam'
     135          async def foo():
     136              return await bar()
     137  
     138          # production mode
     139          data = self.loop.run_until_complete(foo())
     140          self.assertEqual(data, 'spam')
     141  
     142          # debug mode
     143          self.loop.set_debug(True)
     144          data = self.loop.run_until_complete(foo())
     145          self.assertEqual(data, 'spam')
     146  
     147      def test_debug_mode_manages_coroutine_origin_tracking(self):
     148          async def start():
     149              self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0)
     150  
     151          self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
     152          self.loop.set_debug(True)
     153          self.loop.run_until_complete(start())
     154          self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
     155  
     156      def test_types_coroutine(self):
     157          def gen():
     158              yield from ()
     159              return 'spam'
     160  
     161          @types.coroutine
     162          def func():
     163              return gen()
     164  
     165          async def coro():
     166              wrapper = func()
     167              self.assertIsInstance(wrapper, types._GeneratorWrapper)
     168              return await wrapper
     169  
     170          data = self.loop.run_until_complete(coro())
     171          self.assertEqual(data, 'spam')
     172  
     173      def test_task_print_stack(self):
     174          T = None
     175  
     176          async def foo():
     177              f = T.get_stack(limit=1)
     178              try:
     179                  self.assertEqual(f[0].f_code.co_name, 'foo')
     180              finally:
     181                  f = None
     182  
     183          async def runner():
     184              nonlocal T
     185              T = asyncio.ensure_future(foo(), loop=self.loop)
     186              await T
     187  
     188          self.loop.run_until_complete(runner())
     189  
     190      def test_double_await(self):
     191          async def afunc():
     192              await asyncio.sleep(0.1)
     193  
     194          async def runner():
     195              coro = afunc()
     196              t = self.loop.create_task(coro)
     197              try:
     198                  await asyncio.sleep(0)
     199                  await coro
     200              finally:
     201                  t.cancel()
     202  
     203          self.loop.set_debug(True)
     204          with self.assertRaises(
     205                  RuntimeError,
     206                  msg='coroutine is being awaited already'):
     207  
     208              self.loop.run_until_complete(runner())
     209  
     210  
     211  if __name__ == '__main__':
     212      unittest.main()