(root)/
Python-3.11.7/
Lib/
test/
test_contextlib.py
       1  """Unit tests for contextlib.py, and other context managers."""
       2  
       3  import io
       4  import os
       5  import sys
       6  import tempfile
       7  import threading
       8  import traceback
       9  import unittest
      10  from contextlib import *  # Tests __all__
      11  from test import support
      12  from test.support import os_helper
      13  import weakref
      14  
      15  
      16  class ESC[4;38;5;81mTestAbstractContextManager(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      17  
      18      def test_enter(self):
      19          class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractContextManager):
      20              def __exit__(self, *args):
      21                  super().__exit__(*args)
      22  
      23          manager = DefaultEnter()
      24          self.assertIs(manager.__enter__(), manager)
      25  
      26      def test_exit_is_abstract(self):
      27          class ESC[4;38;5;81mMissingExit(ESC[4;38;5;149mAbstractContextManager):
      28              pass
      29  
      30          with self.assertRaises(TypeError):
      31              MissingExit()
      32  
      33      def test_structural_subclassing(self):
      34          class ESC[4;38;5;81mManagerFromScratch:
      35              def __enter__(self):
      36                  return self
      37              def __exit__(self, exc_type, exc_value, traceback):
      38                  return None
      39  
      40          self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
      41  
      42          class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractContextManager):
      43              def __exit__(self, *args):
      44                  super().__exit__(*args)
      45  
      46          self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
      47  
      48          class ESC[4;38;5;81mNoEnter(ESC[4;38;5;149mManagerFromScratch):
      49              __enter__ = None
      50  
      51          self.assertFalse(issubclass(NoEnter, AbstractContextManager))
      52  
      53          class ESC[4;38;5;81mNoExit(ESC[4;38;5;149mManagerFromScratch):
      54              __exit__ = None
      55  
      56          self.assertFalse(issubclass(NoExit, AbstractContextManager))
      57  
      58  
      59  class ESC[4;38;5;81mContextManagerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      60  
      61      def test_contextmanager_plain(self):
      62          state = []
      63          @contextmanager
      64          def woohoo():
      65              state.append(1)
      66              yield 42
      67              state.append(999)
      68          with woohoo() as x:
      69              self.assertEqual(state, [1])
      70              self.assertEqual(x, 42)
      71              state.append(x)
      72          self.assertEqual(state, [1, 42, 999])
      73  
      74      def test_contextmanager_finally(self):
      75          state = []
      76          @contextmanager
      77          def woohoo():
      78              state.append(1)
      79              try:
      80                  yield 42
      81              finally:
      82                  state.append(999)
      83          with self.assertRaises(ZeroDivisionError):
      84              with woohoo() as x:
      85                  self.assertEqual(state, [1])
      86                  self.assertEqual(x, 42)
      87                  state.append(x)
      88                  raise ZeroDivisionError()
      89          self.assertEqual(state, [1, 42, 999])
      90  
      91      def test_contextmanager_traceback(self):
      92          @contextmanager
      93          def f():
      94              yield
      95  
      96          try:
      97              with f():
      98                  1/0
      99          except ZeroDivisionError as e:
     100              frames = traceback.extract_tb(e.__traceback__)
     101  
     102          self.assertEqual(len(frames), 1)
     103          self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     104          self.assertEqual(frames[0].line, '1/0')
     105  
     106          # Repeat with RuntimeError (which goes through a different code path)
     107          class ESC[4;38;5;81mRuntimeErrorSubclass(ESC[4;38;5;149mRuntimeError):
     108              pass
     109  
     110          try:
     111              with f():
     112                  raise RuntimeErrorSubclass(42)
     113          except RuntimeErrorSubclass as e:
     114              frames = traceback.extract_tb(e.__traceback__)
     115  
     116          self.assertEqual(len(frames), 1)
     117          self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     118          self.assertEqual(frames[0].line, 'raise RuntimeErrorSubclass(42)')
     119  
     120          class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
     121              pass
     122  
     123          for stop_exc in (
     124              StopIteration('spam'),
     125              StopIterationSubclass('spam'),
     126          ):
     127              with self.subTest(type=type(stop_exc)):
     128                  try:
     129                      with f():
     130                          raise stop_exc
     131                  except type(stop_exc) as e:
     132                      self.assertIs(e, stop_exc)
     133                      frames = traceback.extract_tb(e.__traceback__)
     134                  else:
     135                      self.fail(f'{stop_exc} was suppressed')
     136  
     137                  self.assertEqual(len(frames), 1)
     138                  self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
     139                  self.assertEqual(frames[0].line, 'raise stop_exc')
     140  
     141      def test_contextmanager_no_reraise(self):
     142          @contextmanager
     143          def whee():
     144              yield
     145          ctx = whee()
     146          ctx.__enter__()
     147          # Calling __exit__ should not result in an exception
     148          self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
     149  
     150      def test_contextmanager_trap_yield_after_throw(self):
     151          @contextmanager
     152          def whoo():
     153              try:
     154                  yield
     155              except:
     156                  yield
     157          ctx = whoo()
     158          ctx.__enter__()
     159          with self.assertRaises(RuntimeError):
     160              ctx.__exit__(TypeError, TypeError("foo"), None)
     161          if support.check_impl_detail(cpython=True):
     162              # The "gen" attribute is an implementation detail.
     163              self.assertFalse(ctx.gen.gi_suspended)
     164  
     165      def test_contextmanager_trap_no_yield(self):
     166          @contextmanager
     167          def whoo():
     168              if False:
     169                  yield
     170          ctx = whoo()
     171          with self.assertRaises(RuntimeError):
     172              ctx.__enter__()
     173  
     174      def test_contextmanager_trap_second_yield(self):
     175          @contextmanager
     176          def whoo():
     177              yield
     178              yield
     179          ctx = whoo()
     180          ctx.__enter__()
     181          with self.assertRaises(RuntimeError):
     182              ctx.__exit__(None, None, None)
     183          if support.check_impl_detail(cpython=True):
     184              # The "gen" attribute is an implementation detail.
     185              self.assertFalse(ctx.gen.gi_suspended)
     186  
     187      def test_contextmanager_non_normalised(self):
     188          @contextmanager
     189          def whoo():
     190              try:
     191                  yield
     192              except RuntimeError:
     193                  raise SyntaxError
     194  
     195          ctx = whoo()
     196          ctx.__enter__()
     197          with self.assertRaises(SyntaxError):
     198              ctx.__exit__(RuntimeError, None, None)
     199  
     200      def test_contextmanager_except(self):
     201          state = []
     202          @contextmanager
     203          def woohoo():
     204              state.append(1)
     205              try:
     206                  yield 42
     207              except ZeroDivisionError as e:
     208                  state.append(e.args[0])
     209                  self.assertEqual(state, [1, 42, 999])
     210          with woohoo() as x:
     211              self.assertEqual(state, [1])
     212              self.assertEqual(x, 42)
     213              state.append(x)
     214              raise ZeroDivisionError(999)
     215          self.assertEqual(state, [1, 42, 999])
     216  
     217      def test_contextmanager_except_stopiter(self):
     218          @contextmanager
     219          def woohoo():
     220              yield
     221  
     222          class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
     223              pass
     224  
     225          for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')):
     226              with self.subTest(type=type(stop_exc)):
     227                  try:
     228                      with woohoo():
     229                          raise stop_exc
     230                  except Exception as ex:
     231                      self.assertIs(ex, stop_exc)
     232                  else:
     233                      self.fail(f'{stop_exc} was suppressed')
     234  
     235      def test_contextmanager_except_pep479(self):
     236          code = """\
     237  from __future__ import generator_stop
     238  from contextlib import contextmanager
     239  @contextmanager
     240  def woohoo():
     241      yield
     242  """
     243          locals = {}
     244          exec(code, locals, locals)
     245          woohoo = locals['woohoo']
     246  
     247          stop_exc = StopIteration('spam')
     248          try:
     249              with woohoo():
     250                  raise stop_exc
     251          except Exception as ex:
     252              self.assertIs(ex, stop_exc)
     253          else:
     254              self.fail('StopIteration was suppressed')
     255  
     256      def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self):
     257          @contextmanager
     258          def test_issue29692():
     259              try:
     260                  yield
     261              except Exception as exc:
     262                  raise RuntimeError('issue29692:Chained') from exc
     263          try:
     264              with test_issue29692():
     265                  raise ZeroDivisionError
     266          except Exception as ex:
     267              self.assertIs(type(ex), RuntimeError)
     268              self.assertEqual(ex.args[0], 'issue29692:Chained')
     269              self.assertIsInstance(ex.__cause__, ZeroDivisionError)
     270  
     271          try:
     272              with test_issue29692():
     273                  raise StopIteration('issue29692:Unchained')
     274          except Exception as ex:
     275              self.assertIs(type(ex), StopIteration)
     276              self.assertEqual(ex.args[0], 'issue29692:Unchained')
     277              self.assertIsNone(ex.__cause__)
     278  
     279      def test_contextmanager_wrap_runtimeerror(self):
     280          @contextmanager
     281          def woohoo():
     282              try:
     283                  yield
     284              except Exception as exc:
     285                  raise RuntimeError(f'caught {exc}') from exc
     286  
     287          with self.assertRaises(RuntimeError):
     288              with woohoo():
     289                  1 / 0
     290  
     291          # If the context manager wrapped StopIteration in a RuntimeError,
     292          # we also unwrap it, because we can't tell whether the wrapping was
     293          # done by the generator machinery or by the generator itself.
     294          with self.assertRaises(StopIteration):
     295              with woohoo():
     296                  raise StopIteration
     297  
     298      def _create_contextmanager_attribs(self):
     299          def attribs(**kw):
     300              def decorate(func):
     301                  for k,v in kw.items():
     302                      setattr(func,k,v)
     303                  return func
     304              return decorate
     305          @contextmanager
     306          @attribs(foo='bar')
     307          def baz(spam):
     308              """Whee!"""
     309              yield
     310          return baz
     311  
     312      def test_contextmanager_attribs(self):
     313          baz = self._create_contextmanager_attribs()
     314          self.assertEqual(baz.__name__,'baz')
     315          self.assertEqual(baz.foo, 'bar')
     316  
     317      @support.requires_docstrings
     318      def test_contextmanager_doc_attrib(self):
     319          baz = self._create_contextmanager_attribs()
     320          self.assertEqual(baz.__doc__, "Whee!")
     321  
     322      @support.requires_docstrings
     323      def test_instance_docstring_given_cm_docstring(self):
     324          baz = self._create_contextmanager_attribs()(None)
     325          self.assertEqual(baz.__doc__, "Whee!")
     326  
     327      def test_keywords(self):
     328          # Ensure no keyword arguments are inhibited
     329          @contextmanager
     330          def woohoo(self, func, args, kwds):
     331              yield (self, func, args, kwds)
     332          with woohoo(self=11, func=22, args=33, kwds=44) as target:
     333              self.assertEqual(target, (11, 22, 33, 44))
     334  
     335      def test_nokeepref(self):
     336          class ESC[4;38;5;81mA:
     337              pass
     338  
     339          @contextmanager
     340          def woohoo(a, b):
     341              a = weakref.ref(a)
     342              b = weakref.ref(b)
     343              # Allow test to work with a non-refcounted GC
     344              support.gc_collect()
     345              self.assertIsNone(a())
     346              self.assertIsNone(b())
     347              yield
     348  
     349          with woohoo(A(), b=A()):
     350              pass
     351  
     352      def test_param_errors(self):
     353          @contextmanager
     354          def woohoo(a, *, b):
     355              yield
     356  
     357          with self.assertRaises(TypeError):
     358              woohoo()
     359          with self.assertRaises(TypeError):
     360              woohoo(3, 5)
     361          with self.assertRaises(TypeError):
     362              woohoo(b=3)
     363  
     364      def test_recursive(self):
     365          depth = 0
     366          ncols = 0
     367          @contextmanager
     368          def woohoo():
     369              nonlocal ncols
     370              ncols += 1
     371              nonlocal depth
     372              before = depth
     373              depth += 1
     374              yield
     375              depth -= 1
     376              self.assertEqual(depth, before)
     377  
     378          @woohoo()
     379          def recursive():
     380              if depth < 10:
     381                  recursive()
     382  
     383          recursive()
     384          self.assertEqual(ncols, 10)
     385          self.assertEqual(depth, 0)
     386  
     387  
     388  class ESC[4;38;5;81mClosingTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     389  
     390      @support.requires_docstrings
     391      def test_instance_docs(self):
     392          # Issue 19330: ensure context manager instances have good docstrings
     393          cm_docstring = closing.__doc__
     394          obj = closing(None)
     395          self.assertEqual(obj.__doc__, cm_docstring)
     396  
     397      def test_closing(self):
     398          state = []
     399          class ESC[4;38;5;81mC:
     400              def close(self):
     401                  state.append(1)
     402          x = C()
     403          self.assertEqual(state, [])
     404          with closing(x) as y:
     405              self.assertEqual(x, y)
     406          self.assertEqual(state, [1])
     407  
     408      def test_closing_error(self):
     409          state = []
     410          class ESC[4;38;5;81mC:
     411              def close(self):
     412                  state.append(1)
     413          x = C()
     414          self.assertEqual(state, [])
     415          with self.assertRaises(ZeroDivisionError):
     416              with closing(x) as y:
     417                  self.assertEqual(x, y)
     418                  1 / 0
     419          self.assertEqual(state, [1])
     420  
     421  
     422  class ESC[4;38;5;81mNullcontextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     423      def test_nullcontext(self):
     424          class ESC[4;38;5;81mC:
     425              pass
     426          c = C()
     427          with nullcontext(c) as c_in:
     428              self.assertIs(c_in, c)
     429  
     430  
     431  class ESC[4;38;5;81mFileContextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     432  
     433      def testWithOpen(self):
     434          tfn = tempfile.mktemp()
     435          try:
     436              f = None
     437              with open(tfn, "w", encoding="utf-8") as f:
     438                  self.assertFalse(f.closed)
     439                  f.write("Booh\n")
     440              self.assertTrue(f.closed)
     441              f = None
     442              with self.assertRaises(ZeroDivisionError):
     443                  with open(tfn, "r", encoding="utf-8") as f:
     444                      self.assertFalse(f.closed)
     445                      self.assertEqual(f.read(), "Booh\n")
     446                      1 / 0
     447              self.assertTrue(f.closed)
     448          finally:
     449              os_helper.unlink(tfn)
     450  
     451  class ESC[4;38;5;81mLockContextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     452  
     453      def boilerPlate(self, lock, locked):
     454          self.assertFalse(locked())
     455          with lock:
     456              self.assertTrue(locked())
     457          self.assertFalse(locked())
     458          with self.assertRaises(ZeroDivisionError):
     459              with lock:
     460                  self.assertTrue(locked())
     461                  1 / 0
     462          self.assertFalse(locked())
     463  
     464      def testWithLock(self):
     465          lock = threading.Lock()
     466          self.boilerPlate(lock, lock.locked)
     467  
     468      def testWithRLock(self):
     469          lock = threading.RLock()
     470          self.boilerPlate(lock, lock._is_owned)
     471  
     472      def testWithCondition(self):
     473          lock = threading.Condition()
     474          def locked():
     475              return lock._is_owned()
     476          self.boilerPlate(lock, locked)
     477  
     478      def testWithSemaphore(self):
     479          lock = threading.Semaphore()
     480          def locked():
     481              if lock.acquire(False):
     482                  lock.release()
     483                  return False
     484              else:
     485                  return True
     486          self.boilerPlate(lock, locked)
     487  
     488      def testWithBoundedSemaphore(self):
     489          lock = threading.BoundedSemaphore()
     490          def locked():
     491              if lock.acquire(False):
     492                  lock.release()
     493                  return False
     494              else:
     495                  return True
     496          self.boilerPlate(lock, locked)
     497  
     498  
     499  class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
     500      """Example decoration-compatible context manager for testing"""
     501      started = False
     502      exc = None
     503      catch = False
     504  
     505      def __enter__(self):
     506          self.started = True
     507          return self
     508  
     509      def __exit__(self, *exc):
     510          self.exc = exc
     511          return self.catch
     512  
     513  
     514  class ESC[4;38;5;81mTestContextDecorator(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     515  
     516      @support.requires_docstrings
     517      def test_instance_docs(self):
     518          # Issue 19330: ensure context manager instances have good docstrings
     519          cm_docstring = mycontext.__doc__
     520          obj = mycontext()
     521          self.assertEqual(obj.__doc__, cm_docstring)
     522  
     523      def test_contextdecorator(self):
     524          context = mycontext()
     525          with context as result:
     526              self.assertIs(result, context)
     527              self.assertTrue(context.started)
     528  
     529          self.assertEqual(context.exc, (None, None, None))
     530  
     531  
     532      def test_contextdecorator_with_exception(self):
     533          context = mycontext()
     534  
     535          with self.assertRaisesRegex(NameError, 'foo'):
     536              with context:
     537                  raise NameError('foo')
     538          self.assertIsNotNone(context.exc)
     539          self.assertIs(context.exc[0], NameError)
     540  
     541          context = mycontext()
     542          context.catch = True
     543          with context:
     544              raise NameError('foo')
     545          self.assertIsNotNone(context.exc)
     546          self.assertIs(context.exc[0], NameError)
     547  
     548  
     549      def test_decorator(self):
     550          context = mycontext()
     551  
     552          @context
     553          def test():
     554              self.assertIsNone(context.exc)
     555              self.assertTrue(context.started)
     556          test()
     557          self.assertEqual(context.exc, (None, None, None))
     558  
     559  
     560      def test_decorator_with_exception(self):
     561          context = mycontext()
     562  
     563          @context
     564          def test():
     565              self.assertIsNone(context.exc)
     566              self.assertTrue(context.started)
     567              raise NameError('foo')
     568  
     569          with self.assertRaisesRegex(NameError, 'foo'):
     570              test()
     571          self.assertIsNotNone(context.exc)
     572          self.assertIs(context.exc[0], NameError)
     573  
     574  
     575      def test_decorating_method(self):
     576          context = mycontext()
     577  
     578          class ESC[4;38;5;81mTest(ESC[4;38;5;149mobject):
     579  
     580              @context
     581              def method(self, a, b, c=None):
     582                  self.a = a
     583                  self.b = b
     584                  self.c = c
     585  
     586          # these tests are for argument passing when used as a decorator
     587          test = Test()
     588          test.method(1, 2)
     589          self.assertEqual(test.a, 1)
     590          self.assertEqual(test.b, 2)
     591          self.assertEqual(test.c, None)
     592  
     593          test = Test()
     594          test.method('a', 'b', 'c')
     595          self.assertEqual(test.a, 'a')
     596          self.assertEqual(test.b, 'b')
     597          self.assertEqual(test.c, 'c')
     598  
     599          test = Test()
     600          test.method(a=1, b=2)
     601          self.assertEqual(test.a, 1)
     602          self.assertEqual(test.b, 2)
     603  
     604  
     605      def test_typo_enter(self):
     606          class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
     607              def __unter__(self):
     608                  pass
     609              def __exit__(self, *exc):
     610                  pass
     611  
     612          with self.assertRaisesRegex(TypeError, 'the context manager'):
     613              with mycontext():
     614                  pass
     615  
     616  
     617      def test_typo_exit(self):
     618          class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
     619              def __enter__(self):
     620                  pass
     621              def __uxit__(self, *exc):
     622                  pass
     623  
     624          with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'):
     625              with mycontext():
     626                  pass
     627  
     628  
     629      def test_contextdecorator_as_mixin(self):
     630          class ESC[4;38;5;81msomecontext(ESC[4;38;5;149mobject):
     631              started = False
     632              exc = None
     633  
     634              def __enter__(self):
     635                  self.started = True
     636                  return self
     637  
     638              def __exit__(self, *exc):
     639                  self.exc = exc
     640  
     641          class ESC[4;38;5;81mmycontext(ESC[4;38;5;149msomecontext, ESC[4;38;5;149mContextDecorator):
     642              pass
     643  
     644          context = mycontext()
     645          @context
     646          def test():
     647              self.assertIsNone(context.exc)
     648              self.assertTrue(context.started)
     649          test()
     650          self.assertEqual(context.exc, (None, None, None))
     651  
     652  
     653      def test_contextmanager_as_decorator(self):
     654          @contextmanager
     655          def woohoo(y):
     656              state.append(y)
     657              yield
     658              state.append(999)
     659  
     660          state = []
     661          @woohoo(1)
     662          def test(x):
     663              self.assertEqual(state, [1])
     664              state.append(x)
     665          test('something')
     666          self.assertEqual(state, [1, 'something', 999])
     667  
     668          # Issue #11647: Ensure the decorated function is 'reusable'
     669          state = []
     670          test('something else')
     671          self.assertEqual(state, [1, 'something else', 999])
     672  
     673  
     674  class ESC[4;38;5;81mTestBaseExitStack:
     675      exit_stack = None
     676  
     677      @support.requires_docstrings
     678      def test_instance_docs(self):
     679          # Issue 19330: ensure context manager instances have good docstrings
     680          cm_docstring = self.exit_stack.__doc__
     681          obj = self.exit_stack()
     682          self.assertEqual(obj.__doc__, cm_docstring)
     683  
     684      def test_no_resources(self):
     685          with self.exit_stack():
     686              pass
     687  
     688      def test_callback(self):
     689          expected = [
     690              ((), {}),
     691              ((1,), {}),
     692              ((1,2), {}),
     693              ((), dict(example=1)),
     694              ((1,), dict(example=1)),
     695              ((1,2), dict(example=1)),
     696              ((1,2), dict(self=3, callback=4)),
     697          ]
     698          result = []
     699          def _exit(*args, **kwds):
     700              """Test metadata propagation"""
     701              result.append((args, kwds))
     702          with self.exit_stack() as stack:
     703              for args, kwds in reversed(expected):
     704                  if args and kwds:
     705                      f = stack.callback(_exit, *args, **kwds)
     706                  elif args:
     707                      f = stack.callback(_exit, *args)
     708                  elif kwds:
     709                      f = stack.callback(_exit, **kwds)
     710                  else:
     711                      f = stack.callback(_exit)
     712                  self.assertIs(f, _exit)
     713              for wrapper in stack._exit_callbacks:
     714                  self.assertIs(wrapper[1].__wrapped__, _exit)
     715                  self.assertNotEqual(wrapper[1].__name__, _exit.__name__)
     716                  self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
     717          self.assertEqual(result, expected)
     718  
     719          result = []
     720          with self.exit_stack() as stack:
     721              with self.assertRaises(TypeError):
     722                  stack.callback(arg=1)
     723              with self.assertRaises(TypeError):
     724                  self.exit_stack.callback(arg=2)
     725              with self.assertRaises(TypeError):
     726                  stack.callback(callback=_exit, arg=3)
     727          self.assertEqual(result, [])
     728  
     729      def test_push(self):
     730          exc_raised = ZeroDivisionError
     731          def _expect_exc(exc_type, exc, exc_tb):
     732              self.assertIs(exc_type, exc_raised)
     733          def _suppress_exc(*exc_details):
     734              return True
     735          def _expect_ok(exc_type, exc, exc_tb):
     736              self.assertIsNone(exc_type)
     737              self.assertIsNone(exc)
     738              self.assertIsNone(exc_tb)
     739          class ESC[4;38;5;81mExitCM(ESC[4;38;5;149mobject):
     740              def __init__(self, check_exc):
     741                  self.check_exc = check_exc
     742              def __enter__(self):
     743                  self.fail("Should not be called!")
     744              def __exit__(self, *exc_details):
     745                  self.check_exc(*exc_details)
     746          with self.exit_stack() as stack:
     747              stack.push(_expect_ok)
     748              self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)
     749              cm = ExitCM(_expect_ok)
     750              stack.push(cm)
     751              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     752              stack.push(_suppress_exc)
     753              self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)
     754              cm = ExitCM(_expect_exc)
     755              stack.push(cm)
     756              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     757              stack.push(_expect_exc)
     758              self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
     759              stack.push(_expect_exc)
     760              self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
     761              1/0
     762  
     763      def test_enter_context(self):
     764          class ESC[4;38;5;81mTestCM(ESC[4;38;5;149mobject):
     765              def __enter__(self):
     766                  result.append(1)
     767              def __exit__(self, *exc_details):
     768                  result.append(3)
     769  
     770          result = []
     771          cm = TestCM()
     772          with self.exit_stack() as stack:
     773              @stack.callback  # Registered first => cleaned up last
     774              def _exit():
     775                  result.append(4)
     776              self.assertIsNotNone(_exit)
     777              stack.enter_context(cm)
     778              self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
     779              result.append(2)
     780          self.assertEqual(result, [1, 2, 3, 4])
     781  
     782      def test_enter_context_errors(self):
     783          class ESC[4;38;5;81mLacksEnterAndExit:
     784              pass
     785          class ESC[4;38;5;81mLacksEnter:
     786              def __exit__(self, *exc_info):
     787                  pass
     788          class ESC[4;38;5;81mLacksExit:
     789              def __enter__(self):
     790                  pass
     791  
     792          with self.exit_stack() as stack:
     793              with self.assertRaisesRegex(TypeError, 'the context manager'):
     794                  stack.enter_context(LacksEnterAndExit())
     795              with self.assertRaisesRegex(TypeError, 'the context manager'):
     796                  stack.enter_context(LacksEnter())
     797              with self.assertRaisesRegex(TypeError, 'the context manager'):
     798                  stack.enter_context(LacksExit())
     799              self.assertFalse(stack._exit_callbacks)
     800  
     801      def test_close(self):
     802          result = []
     803          with self.exit_stack() as stack:
     804              @stack.callback
     805              def _exit():
     806                  result.append(1)
     807              self.assertIsNotNone(_exit)
     808              stack.close()
     809              result.append(2)
     810          self.assertEqual(result, [1, 2])
     811  
     812      def test_pop_all(self):
     813          result = []
     814          with self.exit_stack() as stack:
     815              @stack.callback
     816              def _exit():
     817                  result.append(3)
     818              self.assertIsNotNone(_exit)
     819              new_stack = stack.pop_all()
     820              result.append(1)
     821          result.append(2)
     822          new_stack.close()
     823          self.assertEqual(result, [1, 2, 3])
     824  
     825      def test_exit_raise(self):
     826          with self.assertRaises(ZeroDivisionError):
     827              with self.exit_stack() as stack:
     828                  stack.push(lambda *exc: False)
     829                  1/0
     830  
     831      def test_exit_suppress(self):
     832          with self.exit_stack() as stack:
     833              stack.push(lambda *exc: True)
     834              1/0
     835  
     836      def test_exit_exception_traceback(self):
     837          # This test captures the current behavior of ExitStack so that we know
     838          # if we ever unintendedly change it. It is not a statement of what the
     839          # desired behavior is (for instance, we may want to remove some of the
     840          # internal contextlib frames).
     841  
     842          def raise_exc(exc):
     843              raise exc
     844  
     845          try:
     846              with self.exit_stack() as stack:
     847                  stack.callback(raise_exc, ValueError)
     848                  1/0
     849          except ValueError as e:
     850              exc = e
     851  
     852          self.assertIsInstance(exc, ValueError)
     853          ve_frames = traceback.extract_tb(exc.__traceback__)
     854          expected = \
     855              [('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \
     856              self.callback_error_internal_frames + \
     857              [('_exit_wrapper', 'callback(*args, **kwds)'),
     858               ('raise_exc', 'raise exc')]
     859  
     860          self.assertEqual(
     861              [(f.name, f.line) for f in ve_frames], expected)
     862  
     863          self.assertIsInstance(exc.__context__, ZeroDivisionError)
     864          zde_frames = traceback.extract_tb(exc.__context__.__traceback__)
     865          self.assertEqual([(f.name, f.line) for f in zde_frames],
     866                           [('test_exit_exception_traceback', '1/0')])
     867  
     868      def test_exit_exception_chaining_reference(self):
     869          # Sanity check to make sure that ExitStack chaining matches
     870          # actual nested with statements
     871          class ESC[4;38;5;81mRaiseExc:
     872              def __init__(self, exc):
     873                  self.exc = exc
     874              def __enter__(self):
     875                  return self
     876              def __exit__(self, *exc_details):
     877                  raise self.exc
     878  
     879          class ESC[4;38;5;81mRaiseExcWithContext:
     880              def __init__(self, outer, inner):
     881                  self.outer = outer
     882                  self.inner = inner
     883              def __enter__(self):
     884                  return self
     885              def __exit__(self, *exc_details):
     886                  try:
     887                      raise self.inner
     888                  except:
     889                      raise self.outer
     890  
     891          class ESC[4;38;5;81mSuppressExc:
     892              def __enter__(self):
     893                  return self
     894              def __exit__(self, *exc_details):
     895                  type(self).saved_details = exc_details
     896                  return True
     897  
     898          try:
     899              with RaiseExc(IndexError):
     900                  with RaiseExcWithContext(KeyError, AttributeError):
     901                      with SuppressExc():
     902                          with RaiseExc(ValueError):
     903                              1 / 0
     904          except IndexError as exc:
     905              self.assertIsInstance(exc.__context__, KeyError)
     906              self.assertIsInstance(exc.__context__.__context__, AttributeError)
     907              # Inner exceptions were suppressed
     908              self.assertIsNone(exc.__context__.__context__.__context__)
     909          else:
     910              self.fail("Expected IndexError, but no exception was raised")
     911          # Check the inner exceptions
     912          inner_exc = SuppressExc.saved_details[1]
     913          self.assertIsInstance(inner_exc, ValueError)
     914          self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
     915  
     916      def test_exit_exception_chaining(self):
     917          # Ensure exception chaining matches the reference behaviour
     918          def raise_exc(exc):
     919              raise exc
     920  
     921          saved_details = None
     922          def suppress_exc(*exc_details):
     923              nonlocal saved_details
     924              saved_details = exc_details
     925              return True
     926  
     927          try:
     928              with self.exit_stack() as stack:
     929                  stack.callback(raise_exc, IndexError)
     930                  stack.callback(raise_exc, KeyError)
     931                  stack.callback(raise_exc, AttributeError)
     932                  stack.push(suppress_exc)
     933                  stack.callback(raise_exc, ValueError)
     934                  1 / 0
     935          except IndexError as exc:
     936              self.assertIsInstance(exc.__context__, KeyError)
     937              self.assertIsInstance(exc.__context__.__context__, AttributeError)
     938              # Inner exceptions were suppressed
     939              self.assertIsNone(exc.__context__.__context__.__context__)
     940          else:
     941              self.fail("Expected IndexError, but no exception was raised")
     942          # Check the inner exceptions
     943          inner_exc = saved_details[1]
     944          self.assertIsInstance(inner_exc, ValueError)
     945          self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
     946  
     947      def test_exit_exception_explicit_none_context(self):
     948          # Ensure ExitStack chaining matches actual nested `with` statements
     949          # regarding explicit __context__ = None.
     950  
     951          class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
     952              pass
     953  
     954          @contextmanager
     955          def my_cm():
     956              try:
     957                  yield
     958              except BaseException:
     959                  exc = MyException()
     960                  try:
     961                      raise exc
     962                  finally:
     963                      exc.__context__ = None
     964  
     965          @contextmanager
     966          def my_cm_with_exit_stack():
     967              with self.exit_stack() as stack:
     968                  stack.enter_context(my_cm())
     969                  yield stack
     970  
     971          for cm in (my_cm, my_cm_with_exit_stack):
     972              with self.subTest():
     973                  try:
     974                      with cm():
     975                          raise IndexError()
     976                  except MyException as exc:
     977                      self.assertIsNone(exc.__context__)
     978                  else:
     979                      self.fail("Expected IndexError, but no exception was raised")
     980  
     981      def test_exit_exception_non_suppressing(self):
     982          # http://bugs.python.org/issue19092
     983          def raise_exc(exc):
     984              raise exc
     985  
     986          def suppress_exc(*exc_details):
     987              return True
     988  
     989          try:
     990              with self.exit_stack() as stack:
     991                  stack.callback(lambda: None)
     992                  stack.callback(raise_exc, IndexError)
     993          except Exception as exc:
     994              self.assertIsInstance(exc, IndexError)
     995          else:
     996              self.fail("Expected IndexError, but no exception was raised")
     997  
     998          try:
     999              with self.exit_stack() as stack:
    1000                  stack.callback(raise_exc, KeyError)
    1001                  stack.push(suppress_exc)
    1002                  stack.callback(raise_exc, IndexError)
    1003          except Exception as exc:
    1004              self.assertIsInstance(exc, KeyError)
    1005          else:
    1006              self.fail("Expected KeyError, but no exception was raised")
    1007  
    1008      def test_exit_exception_with_correct_context(self):
    1009          # http://bugs.python.org/issue20317
    1010          @contextmanager
    1011          def gets_the_context_right(exc):
    1012              try:
    1013                  yield
    1014              finally:
    1015                  raise exc
    1016  
    1017          exc1 = Exception(1)
    1018          exc2 = Exception(2)
    1019          exc3 = Exception(3)
    1020          exc4 = Exception(4)
    1021  
    1022          # The contextmanager already fixes the context, so prior to the
    1023          # fix, ExitStack would try to fix it *again* and get into an
    1024          # infinite self-referential loop
    1025          try:
    1026              with self.exit_stack() as stack:
    1027                  stack.enter_context(gets_the_context_right(exc4))
    1028                  stack.enter_context(gets_the_context_right(exc3))
    1029                  stack.enter_context(gets_the_context_right(exc2))
    1030                  raise exc1
    1031          except Exception as exc:
    1032              self.assertIs(exc, exc4)
    1033              self.assertIs(exc.__context__, exc3)
    1034              self.assertIs(exc.__context__.__context__, exc2)
    1035              self.assertIs(exc.__context__.__context__.__context__, exc1)
    1036              self.assertIsNone(
    1037                         exc.__context__.__context__.__context__.__context__)
    1038  
    1039      def test_exit_exception_with_existing_context(self):
    1040          # Addresses a lack of test coverage discovered after checking in a
    1041          # fix for issue 20317 that still contained debugging code.
    1042          def raise_nested(inner_exc, outer_exc):
    1043              try:
    1044                  raise inner_exc
    1045              finally:
    1046                  raise outer_exc
    1047          exc1 = Exception(1)
    1048          exc2 = Exception(2)
    1049          exc3 = Exception(3)
    1050          exc4 = Exception(4)
    1051          exc5 = Exception(5)
    1052          try:
    1053              with self.exit_stack() as stack:
    1054                  stack.callback(raise_nested, exc4, exc5)
    1055                  stack.callback(raise_nested, exc2, exc3)
    1056                  raise exc1
    1057          except Exception as exc:
    1058              self.assertIs(exc, exc5)
    1059              self.assertIs(exc.__context__, exc4)
    1060              self.assertIs(exc.__context__.__context__, exc3)
    1061              self.assertIs(exc.__context__.__context__.__context__, exc2)
    1062              self.assertIs(
    1063                   exc.__context__.__context__.__context__.__context__, exc1)
    1064              self.assertIsNone(
    1065                  exc.__context__.__context__.__context__.__context__.__context__)
    1066  
    1067      def test_body_exception_suppress(self):
    1068          def suppress_exc(*exc_details):
    1069              return True
    1070          try:
    1071              with self.exit_stack() as stack:
    1072                  stack.push(suppress_exc)
    1073                  1/0
    1074          except IndexError as exc:
    1075              self.fail("Expected no exception, got IndexError")
    1076  
    1077      def test_exit_exception_chaining_suppress(self):
    1078          with self.exit_stack() as stack:
    1079              stack.push(lambda *exc: True)
    1080              stack.push(lambda *exc: 1/0)
    1081              stack.push(lambda *exc: {}[1])
    1082  
    1083      def test_excessive_nesting(self):
    1084          # The original implementation would die with RecursionError here
    1085          with self.exit_stack() as stack:
    1086              for i in range(10000):
    1087                  stack.callback(int)
    1088  
    1089      def test_instance_bypass(self):
    1090          class ESC[4;38;5;81mExample(ESC[4;38;5;149mobject): pass
    1091          cm = Example()
    1092          cm.__enter__ = object()
    1093          cm.__exit__ = object()
    1094          stack = self.exit_stack()
    1095          with self.assertRaisesRegex(TypeError, 'the context manager'):
    1096              stack.enter_context(cm)
    1097          stack.push(cm)
    1098          self.assertIs(stack._exit_callbacks[-1][1], cm)
    1099  
    1100      def test_dont_reraise_RuntimeError(self):
    1101          # https://bugs.python.org/issue27122
    1102          class ESC[4;38;5;81mUniqueException(ESC[4;38;5;149mException): pass
    1103          class ESC[4;38;5;81mUniqueRuntimeError(ESC[4;38;5;149mRuntimeError): pass
    1104  
    1105          @contextmanager
    1106          def second():
    1107              try:
    1108                  yield 1
    1109              except Exception as exc:
    1110                  raise UniqueException("new exception") from exc
    1111  
    1112          @contextmanager
    1113          def first():
    1114              try:
    1115                  yield 1
    1116              except Exception as exc:
    1117                  raise exc
    1118  
    1119          # The UniqueRuntimeError should be caught by second()'s exception
    1120          # handler which chain raised a new UniqueException.
    1121          with self.assertRaises(UniqueException) as err_ctx:
    1122              with self.exit_stack() as es_ctx:
    1123                  es_ctx.enter_context(second())
    1124                  es_ctx.enter_context(first())
    1125                  raise UniqueRuntimeError("please no infinite loop.")
    1126  
    1127          exc = err_ctx.exception
    1128          self.assertIsInstance(exc, UniqueException)
    1129          self.assertIsInstance(exc.__context__, UniqueRuntimeError)
    1130          self.assertIsNone(exc.__context__.__context__)
    1131          self.assertIsNone(exc.__context__.__cause__)
    1132          self.assertIs(exc.__cause__, exc.__context__)
    1133  
    1134  
    1135  class ESC[4;38;5;81mTestExitStack(ESC[4;38;5;149mTestBaseExitStack, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1136      exit_stack = ExitStack
    1137      callback_error_internal_frames = [
    1138          ('__exit__', 'raise exc_details[1]'),
    1139          ('__exit__', 'if cb(*exc_details):'),
    1140      ]
    1141  
    1142  
    1143  class ESC[4;38;5;81mTestRedirectStream:
    1144  
    1145      redirect_stream = None
    1146      orig_stream = None
    1147  
    1148      @support.requires_docstrings
    1149      def test_instance_docs(self):
    1150          # Issue 19330: ensure context manager instances have good docstrings
    1151          cm_docstring = self.redirect_stream.__doc__
    1152          obj = self.redirect_stream(None)
    1153          self.assertEqual(obj.__doc__, cm_docstring)
    1154  
    1155      def test_no_redirect_in_init(self):
    1156          orig_stdout = getattr(sys, self.orig_stream)
    1157          self.redirect_stream(None)
    1158          self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    1159  
    1160      def test_redirect_to_string_io(self):
    1161          f = io.StringIO()
    1162          msg = "Consider an API like help(), which prints directly to stdout"
    1163          orig_stdout = getattr(sys, self.orig_stream)
    1164          with self.redirect_stream(f):
    1165              print(msg, file=getattr(sys, self.orig_stream))
    1166          self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    1167          s = f.getvalue().strip()
    1168          self.assertEqual(s, msg)
    1169  
    1170      def test_enter_result_is_target(self):
    1171          f = io.StringIO()
    1172          with self.redirect_stream(f) as enter_result:
    1173              self.assertIs(enter_result, f)
    1174  
    1175      def test_cm_is_reusable(self):
    1176          f = io.StringIO()
    1177          write_to_f = self.redirect_stream(f)
    1178          orig_stdout = getattr(sys, self.orig_stream)
    1179          with write_to_f:
    1180              print("Hello", end=" ", file=getattr(sys, self.orig_stream))
    1181          with write_to_f:
    1182              print("World!", file=getattr(sys, self.orig_stream))
    1183          self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    1184          s = f.getvalue()
    1185          self.assertEqual(s, "Hello World!\n")
    1186  
    1187      def test_cm_is_reentrant(self):
    1188          f = io.StringIO()
    1189          write_to_f = self.redirect_stream(f)
    1190          orig_stdout = getattr(sys, self.orig_stream)
    1191          with write_to_f:
    1192              print("Hello", end=" ", file=getattr(sys, self.orig_stream))
    1193              with write_to_f:
    1194                  print("World!", file=getattr(sys, self.orig_stream))
    1195          self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    1196          s = f.getvalue()
    1197          self.assertEqual(s, "Hello World!\n")
    1198  
    1199  
    1200  class ESC[4;38;5;81mTestRedirectStdout(ESC[4;38;5;149mTestRedirectStream, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1201  
    1202      redirect_stream = redirect_stdout
    1203      orig_stream = "stdout"
    1204  
    1205  
    1206  class ESC[4;38;5;81mTestRedirectStderr(ESC[4;38;5;149mTestRedirectStream, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1207  
    1208      redirect_stream = redirect_stderr
    1209      orig_stream = "stderr"
    1210  
    1211  
    1212  class ESC[4;38;5;81mTestSuppress(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1213  
    1214      @support.requires_docstrings
    1215      def test_instance_docs(self):
    1216          # Issue 19330: ensure context manager instances have good docstrings
    1217          cm_docstring = suppress.__doc__
    1218          obj = suppress()
    1219          self.assertEqual(obj.__doc__, cm_docstring)
    1220  
    1221      def test_no_result_from_enter(self):
    1222          with suppress(ValueError) as enter_result:
    1223              self.assertIsNone(enter_result)
    1224  
    1225      def test_no_exception(self):
    1226          with suppress(ValueError):
    1227              self.assertEqual(pow(2, 5), 32)
    1228  
    1229      def test_exact_exception(self):
    1230          with suppress(TypeError):
    1231              len(5)
    1232  
    1233      def test_exception_hierarchy(self):
    1234          with suppress(LookupError):
    1235              'Hello'[50]
    1236  
    1237      def test_other_exception(self):
    1238          with self.assertRaises(ZeroDivisionError):
    1239              with suppress(TypeError):
    1240                  1/0
    1241  
    1242      def test_no_args(self):
    1243          with self.assertRaises(ZeroDivisionError):
    1244              with suppress():
    1245                  1/0
    1246  
    1247      def test_multiple_exception_args(self):
    1248          with suppress(ZeroDivisionError, TypeError):
    1249              1/0
    1250          with suppress(ZeroDivisionError, TypeError):
    1251              len(5)
    1252  
    1253      def test_cm_is_reentrant(self):
    1254          ignore_exceptions = suppress(Exception)
    1255          with ignore_exceptions:
    1256              pass
    1257          with ignore_exceptions:
    1258              len(5)
    1259          with ignore_exceptions:
    1260              with ignore_exceptions: # Check nested usage
    1261                  len(5)
    1262              outer_continued = True
    1263              1/0
    1264          self.assertTrue(outer_continued)
    1265  
    1266  
    1267  class ESC[4;38;5;81mTestChdir(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1268      def make_relative_path(self, *parts):
    1269          return os.path.join(
    1270              os.path.dirname(os.path.realpath(__file__)),
    1271              *parts,
    1272          )
    1273  
    1274      def test_simple(self):
    1275          old_cwd = os.getcwd()
    1276          target = self.make_relative_path('data')
    1277          self.assertNotEqual(old_cwd, target)
    1278  
    1279          with chdir(target):
    1280              self.assertEqual(os.getcwd(), target)
    1281          self.assertEqual(os.getcwd(), old_cwd)
    1282  
    1283      def test_reentrant(self):
    1284          old_cwd = os.getcwd()
    1285          target1 = self.make_relative_path('data')
    1286          target2 = self.make_relative_path('ziptestdata')
    1287          self.assertNotIn(old_cwd, (target1, target2))
    1288          chdir1, chdir2 = chdir(target1), chdir(target2)
    1289  
    1290          with chdir1:
    1291              self.assertEqual(os.getcwd(), target1)
    1292              with chdir2:
    1293                  self.assertEqual(os.getcwd(), target2)
    1294                  with chdir1:
    1295                      self.assertEqual(os.getcwd(), target1)
    1296                  self.assertEqual(os.getcwd(), target2)
    1297              self.assertEqual(os.getcwd(), target1)
    1298          self.assertEqual(os.getcwd(), old_cwd)
    1299  
    1300      def test_exception(self):
    1301          old_cwd = os.getcwd()
    1302          target = self.make_relative_path('data')
    1303          self.assertNotEqual(old_cwd, target)
    1304  
    1305          try:
    1306              with chdir(target):
    1307                  self.assertEqual(os.getcwd(), target)
    1308                  raise RuntimeError("boom")
    1309          except RuntimeError as re:
    1310              self.assertEqual(str(re), "boom")
    1311          self.assertEqual(os.getcwd(), old_cwd)
    1312  
    1313  
    1314  if __name__ == "__main__":
    1315      unittest.main()