(root)/
Python-3.11.7/
Lib/
test/
test_contextlib_async.py
       1  import asyncio
       2  from contextlib import (
       3      asynccontextmanager, AbstractAsyncContextManager,
       4      AsyncExitStack, nullcontext, aclosing, contextmanager)
       5  import functools
       6  from test import support
       7  import unittest
       8  import traceback
       9  
      10  from test.test_contextlib import TestBaseExitStack
      11  
      12  support.requires_working_socket(module=True)
      13  
      14  def _async_test(func):
      15      """Decorator to turn an async function into a test case."""
      16      @functools.wraps(func)
      17      def wrapper(*args, **kwargs):
      18          coro = func(*args, **kwargs)
      19          asyncio.run(coro)
      20      return wrapper
      21  
      22  def tearDownModule():
      23      asyncio.set_event_loop_policy(None)
      24  
      25  
      26  class ESC[4;38;5;81mTestAbstractAsyncContextManager(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      27  
      28      @_async_test
      29      async def test_enter(self):
      30          class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractAsyncContextManager):
      31              async def __aexit__(self, *args):
      32                  await super().__aexit__(*args)
      33  
      34          manager = DefaultEnter()
      35          self.assertIs(await manager.__aenter__(), manager)
      36  
      37          async with manager as context:
      38              self.assertIs(manager, context)
      39  
      40      @_async_test
      41      async def test_async_gen_propagates_generator_exit(self):
      42          # A regression test for https://bugs.python.org/issue33786.
      43  
      44          @asynccontextmanager
      45          async def ctx():
      46              yield
      47  
      48          async def gen():
      49              async with ctx():
      50                  yield 11
      51  
      52          g = gen()
      53          async for val in g:
      54              self.assertEqual(val, 11)
      55              break
      56          await g.aclose()
      57  
      58      def test_exit_is_abstract(self):
      59          class ESC[4;38;5;81mMissingAexit(ESC[4;38;5;149mAbstractAsyncContextManager):
      60              pass
      61  
      62          with self.assertRaises(TypeError):
      63              MissingAexit()
      64  
      65      def test_structural_subclassing(self):
      66          class ESC[4;38;5;81mManagerFromScratch:
      67              async def __aenter__(self):
      68                  return self
      69              async def __aexit__(self, exc_type, exc_value, traceback):
      70                  return None
      71  
      72          self.assertTrue(issubclass(ManagerFromScratch, AbstractAsyncContextManager))
      73  
      74          class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractAsyncContextManager):
      75              async def __aexit__(self, *args):
      76                  await super().__aexit__(*args)
      77  
      78          self.assertTrue(issubclass(DefaultEnter, AbstractAsyncContextManager))
      79  
      80          class ESC[4;38;5;81mNoneAenter(ESC[4;38;5;149mManagerFromScratch):
      81              __aenter__ = None
      82  
      83          self.assertFalse(issubclass(NoneAenter, AbstractAsyncContextManager))
      84  
      85          class ESC[4;38;5;81mNoneAexit(ESC[4;38;5;149mManagerFromScratch):
      86              __aexit__ = None
      87  
      88          self.assertFalse(issubclass(NoneAexit, AbstractAsyncContextManager))
      89  
      90  
      91  class ESC[4;38;5;81mAsyncContextManagerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      92  
      93      @_async_test
      94      async def test_contextmanager_plain(self):
      95          state = []
      96          @asynccontextmanager
      97          async def woohoo():
      98              state.append(1)
      99              yield 42
     100              state.append(999)
     101          async with woohoo() as x:
     102              self.assertEqual(state, [1])
     103              self.assertEqual(x, 42)
     104              state.append(x)
     105          self.assertEqual(state, [1, 42, 999])
     106  
     107      @_async_test
     108      async def test_contextmanager_finally(self):
     109          state = []
     110          @asynccontextmanager
     111          async def woohoo():
     112              state.append(1)
     113              try:
     114                  yield 42
     115              finally:
     116                  state.append(999)
     117          with self.assertRaises(ZeroDivisionError):
     118              async with woohoo() as x:
     119                  self.assertEqual(state, [1])
     120                  self.assertEqual(x, 42)
     121                  state.append(x)
     122                  raise ZeroDivisionError()
     123          self.assertEqual(state, [1, 42, 999])
     124  
     125      @_async_test
     126      async def test_contextmanager_traceback(self):
     127          @asynccontextmanager
     128          async def f():
     129              yield
     130  
     131          try:
     132              async with f():
     133                  1/0
     134          except ZeroDivisionError as e:
     135              frames = traceback.extract_tb(e.__traceback__)
     136  
     137          self.assertEqual(len(frames), 1)
     138          self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     139          self.assertEqual(frames[0].line, '1/0')
     140  
     141          # Repeat with RuntimeError (which goes through a different code path)
     142          class ESC[4;38;5;81mRuntimeErrorSubclass(ESC[4;38;5;149mRuntimeError):
     143              pass
     144  
     145          try:
     146              async with f():
     147                  raise RuntimeErrorSubclass(42)
     148          except RuntimeErrorSubclass as e:
     149              frames = traceback.extract_tb(e.__traceback__)
     150  
     151          self.assertEqual(len(frames), 1)
     152          self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     153          self.assertEqual(frames[0].line, 'raise RuntimeErrorSubclass(42)')
     154  
     155          class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
     156              pass
     157  
     158          class ESC[4;38;5;81mStopAsyncIterationSubclass(ESC[4;38;5;149mStopAsyncIteration):
     159              pass
     160  
     161          for stop_exc in (
     162              StopIteration('spam'),
     163              StopAsyncIteration('ham'),
     164              StopIterationSubclass('spam'),
     165              StopAsyncIterationSubclass('spam')
     166          ):
     167              with self.subTest(type=type(stop_exc)):
     168                  try:
     169                      async with f():
     170                          raise stop_exc
     171                  except type(stop_exc) as e:
     172                      self.assertIs(e, stop_exc)
     173                      frames = traceback.extract_tb(e.__traceback__)
     174                  else:
     175                      self.fail(f'{stop_exc} was suppressed')
     176  
     177                  self.assertEqual(len(frames), 1)
     178                  self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     179                  self.assertEqual(frames[0].line, 'raise stop_exc')
     180  
     181      @_async_test
     182      async def test_contextmanager_no_reraise(self):
     183          @asynccontextmanager
     184          async def whee():
     185              yield
     186          ctx = whee()
     187          await ctx.__aenter__()
     188          # Calling __aexit__ should not result in an exception
     189          self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))
     190  
     191      @_async_test
     192      async def test_contextmanager_trap_yield_after_throw(self):
     193          @asynccontextmanager
     194          async def whoo():
     195              try:
     196                  yield
     197              except:
     198                  yield
     199          ctx = whoo()
     200          await ctx.__aenter__()
     201          with self.assertRaises(RuntimeError):
     202              await ctx.__aexit__(TypeError, TypeError('foo'), None)
     203  
     204      @_async_test
     205      async def test_contextmanager_trap_no_yield(self):
     206          @asynccontextmanager
     207          async def whoo():
     208              if False:
     209                  yield
     210          ctx = whoo()
     211          with self.assertRaises(RuntimeError):
     212              await ctx.__aenter__()
     213  
     214      @_async_test
     215      async def test_contextmanager_trap_second_yield(self):
     216          @asynccontextmanager
     217          async def whoo():
     218              yield
     219              yield
     220          ctx = whoo()
     221          await ctx.__aenter__()
     222          with self.assertRaises(RuntimeError):
     223              await ctx.__aexit__(None, None, None)
     224  
     225      @_async_test
     226      async def test_contextmanager_non_normalised(self):
     227          @asynccontextmanager
     228          async def whoo():
     229              try:
     230                  yield
     231              except RuntimeError:
     232                  raise SyntaxError
     233  
     234          ctx = whoo()
     235          await ctx.__aenter__()
     236          with self.assertRaises(SyntaxError):
     237              await ctx.__aexit__(RuntimeError, None, None)
     238  
     239      @_async_test
     240      async def test_contextmanager_except(self):
     241          state = []
     242          @asynccontextmanager
     243          async def woohoo():
     244              state.append(1)
     245              try:
     246                  yield 42
     247              except ZeroDivisionError as e:
     248                  state.append(e.args[0])
     249                  self.assertEqual(state, [1, 42, 999])
     250          async with woohoo() as x:
     251              self.assertEqual(state, [1])
     252              self.assertEqual(x, 42)
     253              state.append(x)
     254              raise ZeroDivisionError(999)
     255          self.assertEqual(state, [1, 42, 999])
     256  
     257      @_async_test
     258      async def test_contextmanager_except_stopiter(self):
     259          @asynccontextmanager
     260          async def woohoo():
     261              yield
     262  
     263          class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
     264              pass
     265  
     266          class ESC[4;38;5;81mStopAsyncIterationSubclass(ESC[4;38;5;149mStopAsyncIteration):
     267              pass
     268  
     269          for stop_exc in (
     270              StopIteration('spam'),
     271              StopAsyncIteration('ham'),
     272              StopIterationSubclass('spam'),
     273              StopAsyncIterationSubclass('spam')
     274          ):
     275              with self.subTest(type=type(stop_exc)):
     276                  try:
     277                      async with woohoo():
     278                          raise stop_exc
     279                  except Exception as ex:
     280                      self.assertIs(ex, stop_exc)
     281                  else:
     282                      self.fail(f'{stop_exc} was suppressed')
     283  
     284      @_async_test
     285      async def test_contextmanager_wrap_runtimeerror(self):
     286          @asynccontextmanager
     287          async def woohoo():
     288              try:
     289                  yield
     290              except Exception as exc:
     291                  raise RuntimeError(f'caught {exc}') from exc
     292  
     293          with self.assertRaises(RuntimeError):
     294              async with woohoo():
     295                  1 / 0
     296  
     297          # If the context manager wrapped StopAsyncIteration in a RuntimeError,
     298          # we also unwrap it, because we can't tell whether the wrapping was
     299          # done by the generator machinery or by the generator itself.
     300          with self.assertRaises(StopAsyncIteration):
     301              async with woohoo():
     302                  raise StopAsyncIteration
     303  
     304      def _create_contextmanager_attribs(self):
     305          def attribs(**kw):
     306              def decorate(func):
     307                  for k,v in kw.items():
     308                      setattr(func,k,v)
     309                  return func
     310              return decorate
     311          @asynccontextmanager
     312          @attribs(foo='bar')
     313          async def baz(spam):
     314              """Whee!"""
     315              yield
     316          return baz
     317  
     318      def test_contextmanager_attribs(self):
     319          baz = self._create_contextmanager_attribs()
     320          self.assertEqual(baz.__name__,'baz')
     321          self.assertEqual(baz.foo, 'bar')
     322  
     323      @support.requires_docstrings
     324      def test_contextmanager_doc_attrib(self):
     325          baz = self._create_contextmanager_attribs()
     326          self.assertEqual(baz.__doc__, "Whee!")
     327  
     328      @support.requires_docstrings
     329      @_async_test
     330      async def test_instance_docstring_given_cm_docstring(self):
     331          baz = self._create_contextmanager_attribs()(None)
     332          self.assertEqual(baz.__doc__, "Whee!")
     333          async with baz:
     334              pass  # suppress warning
     335  
     336      @_async_test
     337      async def test_keywords(self):
     338          # Ensure no keyword arguments are inhibited
     339          @asynccontextmanager
     340          async def woohoo(self, func, args, kwds):
     341              yield (self, func, args, kwds)
     342          async with woohoo(self=11, func=22, args=33, kwds=44) as target:
     343              self.assertEqual(target, (11, 22, 33, 44))
     344  
     345      @_async_test
     346      async def test_recursive(self):
     347          depth = 0
     348          ncols = 0
     349  
     350          @asynccontextmanager
     351          async def woohoo():
     352              nonlocal ncols
     353              ncols += 1
     354  
     355              nonlocal depth
     356              before = depth
     357              depth += 1
     358              yield
     359              depth -= 1
     360              self.assertEqual(depth, before)
     361  
     362          @woohoo()
     363          async def recursive():
     364              if depth < 10:
     365                  await recursive()
     366  
     367          await recursive()
     368  
     369          self.assertEqual(ncols, 10)
     370          self.assertEqual(depth, 0)
     371  
     372      @_async_test
     373      async def test_decorator(self):
     374          entered = False
     375  
     376          @asynccontextmanager
     377          async def context():
     378              nonlocal entered
     379              entered = True
     380              yield
     381              entered = False
     382  
     383          @context()
     384          async def test():
     385              self.assertTrue(entered)
     386  
     387          self.assertFalse(entered)
     388          await test()
     389          self.assertFalse(entered)
     390  
     391      @_async_test
     392      async def test_decorator_with_exception(self):
     393          entered = False
     394  
     395          @asynccontextmanager
     396          async def context():
     397              nonlocal entered
     398              try:
     399                  entered = True
     400                  yield
     401              finally:
     402                  entered = False
     403  
     404          @context()
     405          async def test():
     406              self.assertTrue(entered)
     407              raise NameError('foo')
     408  
     409          self.assertFalse(entered)
     410          with self.assertRaisesRegex(NameError, 'foo'):
     411              await test()
     412          self.assertFalse(entered)
     413  
     414      @_async_test
     415      async def test_decorating_method(self):
     416  
     417          @asynccontextmanager
     418          async def context():
     419              yield
     420  
     421  
     422          class ESC[4;38;5;81mTest(ESC[4;38;5;149mobject):
     423  
     424              @context()
     425              async def method(self, a, b, c=None):
     426                  self.a = a
     427                  self.b = b
     428                  self.c = c
     429  
     430          # these tests are for argument passing when used as a decorator
     431          test = Test()
     432          await test.method(1, 2)
     433          self.assertEqual(test.a, 1)
     434          self.assertEqual(test.b, 2)
     435          self.assertEqual(test.c, None)
     436  
     437          test = Test()
     438          await test.method('a', 'b', 'c')
     439          self.assertEqual(test.a, 'a')
     440          self.assertEqual(test.b, 'b')
     441          self.assertEqual(test.c, 'c')
     442  
     443          test = Test()
     444          await test.method(a=1, b=2)
     445          self.assertEqual(test.a, 1)
     446          self.assertEqual(test.b, 2)
     447  
     448  
     449  class ESC[4;38;5;81mAclosingTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     450  
     451      @support.requires_docstrings
     452      def test_instance_docs(self):
     453          cm_docstring = aclosing.__doc__
     454          obj = aclosing(None)
     455          self.assertEqual(obj.__doc__, cm_docstring)
     456  
     457      @_async_test
     458      async def test_aclosing(self):
     459          state = []
     460          class ESC[4;38;5;81mC:
     461              async def aclose(self):
     462                  state.append(1)
     463          x = C()
     464          self.assertEqual(state, [])
     465          async with aclosing(x) as y:
     466              self.assertEqual(x, y)
     467          self.assertEqual(state, [1])
     468  
     469      @_async_test
     470      async def test_aclosing_error(self):
     471          state = []
     472          class ESC[4;38;5;81mC:
     473              async def aclose(self):
     474                  state.append(1)
     475          x = C()
     476          self.assertEqual(state, [])
     477          with self.assertRaises(ZeroDivisionError):
     478              async with aclosing(x) as y:
     479                  self.assertEqual(x, y)
     480                  1 / 0
     481          self.assertEqual(state, [1])
     482  
     483      @_async_test
     484      async def test_aclosing_bpo41229(self):
     485          state = []
     486  
     487          @contextmanager
     488          def sync_resource():
     489              try:
     490                  yield
     491              finally:
     492                  state.append(1)
     493  
     494          async def agenfunc():
     495              with sync_resource():
     496                  yield -1
     497                  yield -2
     498  
     499          x = agenfunc()
     500          self.assertEqual(state, [])
     501          with self.assertRaises(ZeroDivisionError):
     502              async with aclosing(x) as y:
     503                  self.assertEqual(x, y)
     504                  self.assertEqual(-1, await x.__anext__())
     505                  1 / 0
     506          self.assertEqual(state, [1])
     507  
     508  
     509  class ESC[4;38;5;81mTestAsyncExitStack(ESC[4;38;5;149mTestBaseExitStack, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     510      class ESC[4;38;5;81mSyncAsyncExitStack(ESC[4;38;5;149mAsyncExitStack):
     511          @staticmethod
     512          def run_coroutine(coro):
     513              loop = asyncio.get_event_loop_policy().get_event_loop()
     514              t = loop.create_task(coro)
     515              t.add_done_callback(lambda f: loop.stop())
     516              loop.run_forever()
     517  
     518              exc = t.exception()
     519              if not exc:
     520                  return t.result()
     521              else:
     522                  context = exc.__context__
     523  
     524                  try:
     525                      raise exc
     526                  except:
     527                      exc.__context__ = context
     528                      raise exc
     529  
     530          def close(self):
     531              return self.run_coroutine(self.aclose())
     532  
     533          def __enter__(self):
     534              return self.run_coroutine(self.__aenter__())
     535  
     536          def __exit__(self, *exc_details):
     537              return self.run_coroutine(self.__aexit__(*exc_details))
     538  
     539      exit_stack = SyncAsyncExitStack
     540      callback_error_internal_frames = [
     541          ('__exit__', 'return self.run_coroutine(self.__aexit__(*exc_details))'),
     542          ('run_coroutine', 'raise exc'),
     543          ('run_coroutine', 'raise exc'),
     544          ('__aexit__', 'raise exc_details[1]'),
     545          ('__aexit__', 'cb_suppress = cb(*exc_details)'),
     546      ]
     547  
     548      def setUp(self):
     549          self.loop = asyncio.new_event_loop()
     550          asyncio.set_event_loop(self.loop)
     551          self.addCleanup(self.loop.close)
     552          self.addCleanup(asyncio.set_event_loop_policy, None)
     553  
     554      @_async_test
     555      async def test_async_callback(self):
     556          expected = [
     557              ((), {}),
     558              ((1,), {}),
     559              ((1,2), {}),
     560              ((), dict(example=1)),
     561              ((1,), dict(example=1)),
     562              ((1,2), dict(example=1)),
     563          ]
     564          result = []
     565          async def _exit(*args, **kwds):
     566              """Test metadata propagation"""
     567              result.append((args, kwds))
     568  
     569          async with AsyncExitStack() as stack:
     570              for args, kwds in reversed(expected):
     571                  if args and kwds:
     572                      f = stack.push_async_callback(_exit, *args, **kwds)
     573                  elif args:
     574                      f = stack.push_async_callback(_exit, *args)
     575                  elif kwds:
     576                      f = stack.push_async_callback(_exit, **kwds)
     577                  else:
     578                      f = stack.push_async_callback(_exit)
     579                  self.assertIs(f, _exit)
     580              for wrapper in stack._exit_callbacks:
     581                  self.assertIs(wrapper[1].__wrapped__, _exit)
     582                  self.assertNotEqual(wrapper[1].__name__, _exit.__name__)
     583                  self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
     584  
     585          self.assertEqual(result, expected)
     586  
     587          result = []
     588          async with AsyncExitStack() as stack:
     589              with self.assertRaises(TypeError):
     590                  stack.push_async_callback(arg=1)
     591              with self.assertRaises(TypeError):
     592                  self.exit_stack.push_async_callback(arg=2)
     593              with self.assertRaises(TypeError):
     594                  stack.push_async_callback(callback=_exit, arg=3)
     595          self.assertEqual(result, [])
     596  
     597      @_async_test
     598      async def test_async_push(self):
     599          exc_raised = ZeroDivisionError
     600          async def _expect_exc(exc_type, exc, exc_tb):
     601              self.assertIs(exc_type, exc_raised)
     602          async def _suppress_exc(*exc_details):
     603              return True
     604          async def _expect_ok(exc_type, exc, exc_tb):
     605              self.assertIsNone(exc_type)
     606              self.assertIsNone(exc)
     607              self.assertIsNone(exc_tb)
     608          class ESC[4;38;5;81mExitCM(ESC[4;38;5;149mobject):
     609              def __init__(self, check_exc):
     610                  self.check_exc = check_exc
     611              async def __aenter__(self):
     612                  self.fail("Should not be called!")
     613              async def __aexit__(self, *exc_details):
     614                  await self.check_exc(*exc_details)
     615  
     616          async with self.exit_stack() as stack:
     617              stack.push_async_exit(_expect_ok)
     618              self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)
     619              cm = ExitCM(_expect_ok)
     620              stack.push_async_exit(cm)
     621              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     622              stack.push_async_exit(_suppress_exc)
     623              self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)
     624              cm = ExitCM(_expect_exc)
     625              stack.push_async_exit(cm)
     626              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     627              stack.push_async_exit(_expect_exc)
     628              self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
     629              stack.push_async_exit(_expect_exc)
     630              self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
     631              1/0
     632  
     633      @_async_test
     634      async def test_enter_async_context(self):
     635          class ESC[4;38;5;81mTestCM(ESC[4;38;5;149mobject):
     636              async def __aenter__(self):
     637                  result.append(1)
     638              async def __aexit__(self, *exc_details):
     639                  result.append(3)
     640  
     641          result = []
     642          cm = TestCM()
     643  
     644          async with AsyncExitStack() as stack:
     645              @stack.push_async_callback  # Registered first => cleaned up last
     646              async def _exit():
     647                  result.append(4)
     648              self.assertIsNotNone(_exit)
     649              await stack.enter_async_context(cm)
     650              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     651              result.append(2)
     652  
     653          self.assertEqual(result, [1, 2, 3, 4])
     654  
     655      @_async_test
     656      async def test_enter_async_context_errors(self):
     657          class ESC[4;38;5;81mLacksEnterAndExit:
     658              pass
     659          class ESC[4;38;5;81mLacksEnter:
     660              async def __aexit__(self, *exc_info):
     661                  pass
     662          class ESC[4;38;5;81mLacksExit:
     663              async def __aenter__(self):
     664                  pass
     665  
     666          async with self.exit_stack() as stack:
     667              with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     668                  await stack.enter_async_context(LacksEnterAndExit())
     669              with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     670                  await stack.enter_async_context(LacksEnter())
     671              with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     672                  await stack.enter_async_context(LacksExit())
     673              self.assertFalse(stack._exit_callbacks)
     674  
     675      @_async_test
     676      async def test_async_exit_exception_chaining(self):
     677          # Ensure exception chaining matches the reference behaviour
     678          async def raise_exc(exc):
     679              raise exc
     680  
     681          saved_details = None
     682          async def suppress_exc(*exc_details):
     683              nonlocal saved_details
     684              saved_details = exc_details
     685              return True
     686  
     687          try:
     688              async with self.exit_stack() as stack:
     689                  stack.push_async_callback(raise_exc, IndexError)
     690                  stack.push_async_callback(raise_exc, KeyError)
     691                  stack.push_async_callback(raise_exc, AttributeError)
     692                  stack.push_async_exit(suppress_exc)
     693                  stack.push_async_callback(raise_exc, ValueError)
     694                  1 / 0
     695          except IndexError as exc:
     696              self.assertIsInstance(exc.__context__, KeyError)
     697              self.assertIsInstance(exc.__context__.__context__, AttributeError)
     698              # Inner exceptions were suppressed
     699              self.assertIsNone(exc.__context__.__context__.__context__)
     700          else:
     701              self.fail("Expected IndexError, but no exception was raised")
     702          # Check the inner exceptions
     703          inner_exc = saved_details[1]
     704          self.assertIsInstance(inner_exc, ValueError)
     705          self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
     706  
     707      @_async_test
     708      async def test_async_exit_exception_explicit_none_context(self):
     709          # Ensure AsyncExitStack chaining matches actual nested `with` statements
     710          # regarding explicit __context__ = None.
     711  
     712          class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
     713              pass
     714  
     715          @asynccontextmanager
     716          async def my_cm():
     717              try:
     718                  yield
     719              except BaseException:
     720                  exc = MyException()
     721                  try:
     722                      raise exc
     723                  finally:
     724                      exc.__context__ = None
     725  
     726          @asynccontextmanager
     727          async def my_cm_with_exit_stack():
     728              async with self.exit_stack() as stack:
     729                  await stack.enter_async_context(my_cm())
     730                  yield stack
     731  
     732          for cm in (my_cm, my_cm_with_exit_stack):
     733              with self.subTest():
     734                  try:
     735                      async with cm():
     736                          raise IndexError()
     737                  except MyException as exc:
     738                      self.assertIsNone(exc.__context__)
     739                  else:
     740                      self.fail("Expected IndexError, but no exception was raised")
     741  
     742      @_async_test
     743      async def test_instance_bypass_async(self):
     744          class ESC[4;38;5;81mExample(ESC[4;38;5;149mobject): pass
     745          cm = Example()
     746          cm.__aenter__ = object()
     747          cm.__aexit__ = object()
     748          stack = self.exit_stack()
     749          with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     750              await stack.enter_async_context(cm)
     751          stack.push_async_exit(cm)
     752          self.assertIs(stack._exit_callbacks[-1][1], cm)
     753  
     754  
     755  class ESC[4;38;5;81mTestAsyncNullcontext(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     756      @_async_test
     757      async def test_async_nullcontext(self):
     758          class ESC[4;38;5;81mC:
     759              pass
     760          c = C()
     761          async with nullcontext(c) as c_in:
     762              self.assertIs(c_in, c)
     763  
     764  
     765  if __name__ == '__main__':
     766      unittest.main()