python (3.11.7)
1 """Unit tests for contextlib.py, and other context managers."""
2
3 import io
4 import os
5 import sys
6 import tempfile
7 import threading
8 import traceback
9 import unittest
10 from contextlib import * # Tests __all__
11 from test import support
12 from test.support import os_helper
13 import weakref
14
15
16 class ESC[4;38;5;81mTestAbstractContextManager(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
17
18 def test_enter(self):
19 class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractContextManager):
20 def __exit__(self, *args):
21 super().__exit__(*args)
22
23 manager = DefaultEnter()
24 self.assertIs(manager.__enter__(), manager)
25
26 def test_exit_is_abstract(self):
27 class ESC[4;38;5;81mMissingExit(ESC[4;38;5;149mAbstractContextManager):
28 pass
29
30 with self.assertRaises(TypeError):
31 MissingExit()
32
33 def test_structural_subclassing(self):
34 class ESC[4;38;5;81mManagerFromScratch:
35 def __enter__(self):
36 return self
37 def __exit__(self, exc_type, exc_value, traceback):
38 return None
39
40 self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
41
42 class ESC[4;38;5;81mDefaultEnter(ESC[4;38;5;149mAbstractContextManager):
43 def __exit__(self, *args):
44 super().__exit__(*args)
45
46 self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
47
48 class ESC[4;38;5;81mNoEnter(ESC[4;38;5;149mManagerFromScratch):
49 __enter__ = None
50
51 self.assertFalse(issubclass(NoEnter, AbstractContextManager))
52
53 class ESC[4;38;5;81mNoExit(ESC[4;38;5;149mManagerFromScratch):
54 __exit__ = None
55
56 self.assertFalse(issubclass(NoExit, AbstractContextManager))
57
58
59 class ESC[4;38;5;81mContextManagerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
60
61 def test_contextmanager_plain(self):
62 state = []
63 @contextmanager
64 def woohoo():
65 state.append(1)
66 yield 42
67 state.append(999)
68 with woohoo() as x:
69 self.assertEqual(state, [1])
70 self.assertEqual(x, 42)
71 state.append(x)
72 self.assertEqual(state, [1, 42, 999])
73
74 def test_contextmanager_finally(self):
75 state = []
76 @contextmanager
77 def woohoo():
78 state.append(1)
79 try:
80 yield 42
81 finally:
82 state.append(999)
83 with self.assertRaises(ZeroDivisionError):
84 with woohoo() as x:
85 self.assertEqual(state, [1])
86 self.assertEqual(x, 42)
87 state.append(x)
88 raise ZeroDivisionError()
89 self.assertEqual(state, [1, 42, 999])
90
91 def test_contextmanager_traceback(self):
92 @contextmanager
93 def f():
94 yield
95
96 try:
97 with f():
98 1/0
99 except ZeroDivisionError as e:
100 frames = traceback.extract_tb(e.__traceback__)
101
102 self.assertEqual(len(frames), 1)
103 self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
104 self.assertEqual(frames[0].line, '1/0')
105
106 # Repeat with RuntimeError (which goes through a different code path)
107 class ESC[4;38;5;81mRuntimeErrorSubclass(ESC[4;38;5;149mRuntimeError):
108 pass
109
110 try:
111 with f():
112 raise RuntimeErrorSubclass(42)
113 except RuntimeErrorSubclass as e:
114 frames = traceback.extract_tb(e.__traceback__)
115
116 self.assertEqual(len(frames), 1)
117 self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
118 self.assertEqual(frames[0].line, 'raise RuntimeErrorSubclass(42)')
119
120 class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
121 pass
122
123 for stop_exc in (
124 StopIteration('spam'),
125 StopIterationSubclass('spam'),
126 ):
127 with self.subTest(type=type(stop_exc)):
128 try:
129 with f():
130 raise stop_exc
131 except type(stop_exc) as e:
132 self.assertIs(e, stop_exc)
133 frames = traceback.extract_tb(e.__traceback__)
134 else:
135 self.fail(f'{stop_exc} was suppressed')
136
137 self.assertEqual(len(frames), 1)
138 self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
139 self.assertEqual(frames[0].line, 'raise stop_exc')
140
141 def test_contextmanager_no_reraise(self):
142 @contextmanager
143 def whee():
144 yield
145 ctx = whee()
146 ctx.__enter__()
147 # Calling __exit__ should not result in an exception
148 self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
149
150 def test_contextmanager_trap_yield_after_throw(self):
151 @contextmanager
152 def whoo():
153 try:
154 yield
155 except:
156 yield
157 ctx = whoo()
158 ctx.__enter__()
159 with self.assertRaises(RuntimeError):
160 ctx.__exit__(TypeError, TypeError("foo"), None)
161 if support.check_impl_detail(cpython=True):
162 # The "gen" attribute is an implementation detail.
163 self.assertFalse(ctx.gen.gi_suspended)
164
165 def test_contextmanager_trap_no_yield(self):
166 @contextmanager
167 def whoo():
168 if False:
169 yield
170 ctx = whoo()
171 with self.assertRaises(RuntimeError):
172 ctx.__enter__()
173
174 def test_contextmanager_trap_second_yield(self):
175 @contextmanager
176 def whoo():
177 yield
178 yield
179 ctx = whoo()
180 ctx.__enter__()
181 with self.assertRaises(RuntimeError):
182 ctx.__exit__(None, None, None)
183 if support.check_impl_detail(cpython=True):
184 # The "gen" attribute is an implementation detail.
185 self.assertFalse(ctx.gen.gi_suspended)
186
187 def test_contextmanager_non_normalised(self):
188 @contextmanager
189 def whoo():
190 try:
191 yield
192 except RuntimeError:
193 raise SyntaxError
194
195 ctx = whoo()
196 ctx.__enter__()
197 with self.assertRaises(SyntaxError):
198 ctx.__exit__(RuntimeError, None, None)
199
200 def test_contextmanager_except(self):
201 state = []
202 @contextmanager
203 def woohoo():
204 state.append(1)
205 try:
206 yield 42
207 except ZeroDivisionError as e:
208 state.append(e.args[0])
209 self.assertEqual(state, [1, 42, 999])
210 with woohoo() as x:
211 self.assertEqual(state, [1])
212 self.assertEqual(x, 42)
213 state.append(x)
214 raise ZeroDivisionError(999)
215 self.assertEqual(state, [1, 42, 999])
216
217 def test_contextmanager_except_stopiter(self):
218 @contextmanager
219 def woohoo():
220 yield
221
222 class ESC[4;38;5;81mStopIterationSubclass(ESC[4;38;5;149mStopIteration):
223 pass
224
225 for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')):
226 with self.subTest(type=type(stop_exc)):
227 try:
228 with woohoo():
229 raise stop_exc
230 except Exception as ex:
231 self.assertIs(ex, stop_exc)
232 else:
233 self.fail(f'{stop_exc} was suppressed')
234
235 def test_contextmanager_except_pep479(self):
236 code = """\
237 from __future__ import generator_stop
238 from contextlib import contextmanager
239 @contextmanager
240 def woohoo():
241 yield
242 """
243 locals = {}
244 exec(code, locals, locals)
245 woohoo = locals['woohoo']
246
247 stop_exc = StopIteration('spam')
248 try:
249 with woohoo():
250 raise stop_exc
251 except Exception as ex:
252 self.assertIs(ex, stop_exc)
253 else:
254 self.fail('StopIteration was suppressed')
255
256 def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self):
257 @contextmanager
258 def test_issue29692():
259 try:
260 yield
261 except Exception as exc:
262 raise RuntimeError('issue29692:Chained') from exc
263 try:
264 with test_issue29692():
265 raise ZeroDivisionError
266 except Exception as ex:
267 self.assertIs(type(ex), RuntimeError)
268 self.assertEqual(ex.args[0], 'issue29692:Chained')
269 self.assertIsInstance(ex.__cause__, ZeroDivisionError)
270
271 try:
272 with test_issue29692():
273 raise StopIteration('issue29692:Unchained')
274 except Exception as ex:
275 self.assertIs(type(ex), StopIteration)
276 self.assertEqual(ex.args[0], 'issue29692:Unchained')
277 self.assertIsNone(ex.__cause__)
278
279 def test_contextmanager_wrap_runtimeerror(self):
280 @contextmanager
281 def woohoo():
282 try:
283 yield
284 except Exception as exc:
285 raise RuntimeError(f'caught {exc}') from exc
286
287 with self.assertRaises(RuntimeError):
288 with woohoo():
289 1 / 0
290
291 # If the context manager wrapped StopIteration in a RuntimeError,
292 # we also unwrap it, because we can't tell whether the wrapping was
293 # done by the generator machinery or by the generator itself.
294 with self.assertRaises(StopIteration):
295 with woohoo():
296 raise StopIteration
297
298 def _create_contextmanager_attribs(self):
299 def attribs(**kw):
300 def decorate(func):
301 for k,v in kw.items():
302 setattr(func,k,v)
303 return func
304 return decorate
305 @contextmanager
306 @attribs(foo='bar')
307 def baz(spam):
308 """Whee!"""
309 yield
310 return baz
311
312 def test_contextmanager_attribs(self):
313 baz = self._create_contextmanager_attribs()
314 self.assertEqual(baz.__name__,'baz')
315 self.assertEqual(baz.foo, 'bar')
316
317 @support.requires_docstrings
318 def test_contextmanager_doc_attrib(self):
319 baz = self._create_contextmanager_attribs()
320 self.assertEqual(baz.__doc__, "Whee!")
321
322 @support.requires_docstrings
323 def test_instance_docstring_given_cm_docstring(self):
324 baz = self._create_contextmanager_attribs()(None)
325 self.assertEqual(baz.__doc__, "Whee!")
326
327 def test_keywords(self):
328 # Ensure no keyword arguments are inhibited
329 @contextmanager
330 def woohoo(self, func, args, kwds):
331 yield (self, func, args, kwds)
332 with woohoo(self=11, func=22, args=33, kwds=44) as target:
333 self.assertEqual(target, (11, 22, 33, 44))
334
335 def test_nokeepref(self):
336 class ESC[4;38;5;81mA:
337 pass
338
339 @contextmanager
340 def woohoo(a, b):
341 a = weakref.ref(a)
342 b = weakref.ref(b)
343 # Allow test to work with a non-refcounted GC
344 support.gc_collect()
345 self.assertIsNone(a())
346 self.assertIsNone(b())
347 yield
348
349 with woohoo(A(), b=A()):
350 pass
351
352 def test_param_errors(self):
353 @contextmanager
354 def woohoo(a, *, b):
355 yield
356
357 with self.assertRaises(TypeError):
358 woohoo()
359 with self.assertRaises(TypeError):
360 woohoo(3, 5)
361 with self.assertRaises(TypeError):
362 woohoo(b=3)
363
364 def test_recursive(self):
365 depth = 0
366 ncols = 0
367 @contextmanager
368 def woohoo():
369 nonlocal ncols
370 ncols += 1
371 nonlocal depth
372 before = depth
373 depth += 1
374 yield
375 depth -= 1
376 self.assertEqual(depth, before)
377
378 @woohoo()
379 def recursive():
380 if depth < 10:
381 recursive()
382
383 recursive()
384 self.assertEqual(ncols, 10)
385 self.assertEqual(depth, 0)
386
387
388 class ESC[4;38;5;81mClosingTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
389
390 @support.requires_docstrings
391 def test_instance_docs(self):
392 # Issue 19330: ensure context manager instances have good docstrings
393 cm_docstring = closing.__doc__
394 obj = closing(None)
395 self.assertEqual(obj.__doc__, cm_docstring)
396
397 def test_closing(self):
398 state = []
399 class ESC[4;38;5;81mC:
400 def close(self):
401 state.append(1)
402 x = C()
403 self.assertEqual(state, [])
404 with closing(x) as y:
405 self.assertEqual(x, y)
406 self.assertEqual(state, [1])
407
408 def test_closing_error(self):
409 state = []
410 class ESC[4;38;5;81mC:
411 def close(self):
412 state.append(1)
413 x = C()
414 self.assertEqual(state, [])
415 with self.assertRaises(ZeroDivisionError):
416 with closing(x) as y:
417 self.assertEqual(x, y)
418 1 / 0
419 self.assertEqual(state, [1])
420
421
422 class ESC[4;38;5;81mNullcontextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
423 def test_nullcontext(self):
424 class ESC[4;38;5;81mC:
425 pass
426 c = C()
427 with nullcontext(c) as c_in:
428 self.assertIs(c_in, c)
429
430
431 class ESC[4;38;5;81mFileContextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
432
433 def testWithOpen(self):
434 tfn = tempfile.mktemp()
435 try:
436 f = None
437 with open(tfn, "w", encoding="utf-8") as f:
438 self.assertFalse(f.closed)
439 f.write("Booh\n")
440 self.assertTrue(f.closed)
441 f = None
442 with self.assertRaises(ZeroDivisionError):
443 with open(tfn, "r", encoding="utf-8") as f:
444 self.assertFalse(f.closed)
445 self.assertEqual(f.read(), "Booh\n")
446 1 / 0
447 self.assertTrue(f.closed)
448 finally:
449 os_helper.unlink(tfn)
450
451 class ESC[4;38;5;81mLockContextTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
452
453 def boilerPlate(self, lock, locked):
454 self.assertFalse(locked())
455 with lock:
456 self.assertTrue(locked())
457 self.assertFalse(locked())
458 with self.assertRaises(ZeroDivisionError):
459 with lock:
460 self.assertTrue(locked())
461 1 / 0
462 self.assertFalse(locked())
463
464 def testWithLock(self):
465 lock = threading.Lock()
466 self.boilerPlate(lock, lock.locked)
467
468 def testWithRLock(self):
469 lock = threading.RLock()
470 self.boilerPlate(lock, lock._is_owned)
471
472 def testWithCondition(self):
473 lock = threading.Condition()
474 def locked():
475 return lock._is_owned()
476 self.boilerPlate(lock, locked)
477
478 def testWithSemaphore(self):
479 lock = threading.Semaphore()
480 def locked():
481 if lock.acquire(False):
482 lock.release()
483 return False
484 else:
485 return True
486 self.boilerPlate(lock, locked)
487
488 def testWithBoundedSemaphore(self):
489 lock = threading.BoundedSemaphore()
490 def locked():
491 if lock.acquire(False):
492 lock.release()
493 return False
494 else:
495 return True
496 self.boilerPlate(lock, locked)
497
498
499 class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
500 """Example decoration-compatible context manager for testing"""
501 started = False
502 exc = None
503 catch = False
504
505 def __enter__(self):
506 self.started = True
507 return self
508
509 def __exit__(self, *exc):
510 self.exc = exc
511 return self.catch
512
513
514 class ESC[4;38;5;81mTestContextDecorator(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
515
516 @support.requires_docstrings
517 def test_instance_docs(self):
518 # Issue 19330: ensure context manager instances have good docstrings
519 cm_docstring = mycontext.__doc__
520 obj = mycontext()
521 self.assertEqual(obj.__doc__, cm_docstring)
522
523 def test_contextdecorator(self):
524 context = mycontext()
525 with context as result:
526 self.assertIs(result, context)
527 self.assertTrue(context.started)
528
529 self.assertEqual(context.exc, (None, None, None))
530
531
532 def test_contextdecorator_with_exception(self):
533 context = mycontext()
534
535 with self.assertRaisesRegex(NameError, 'foo'):
536 with context:
537 raise NameError('foo')
538 self.assertIsNotNone(context.exc)
539 self.assertIs(context.exc[0], NameError)
540
541 context = mycontext()
542 context.catch = True
543 with context:
544 raise NameError('foo')
545 self.assertIsNotNone(context.exc)
546 self.assertIs(context.exc[0], NameError)
547
548
549 def test_decorator(self):
550 context = mycontext()
551
552 @context
553 def test():
554 self.assertIsNone(context.exc)
555 self.assertTrue(context.started)
556 test()
557 self.assertEqual(context.exc, (None, None, None))
558
559
560 def test_decorator_with_exception(self):
561 context = mycontext()
562
563 @context
564 def test():
565 self.assertIsNone(context.exc)
566 self.assertTrue(context.started)
567 raise NameError('foo')
568
569 with self.assertRaisesRegex(NameError, 'foo'):
570 test()
571 self.assertIsNotNone(context.exc)
572 self.assertIs(context.exc[0], NameError)
573
574
575 def test_decorating_method(self):
576 context = mycontext()
577
578 class ESC[4;38;5;81mTest(ESC[4;38;5;149mobject):
579
580 @context
581 def method(self, a, b, c=None):
582 self.a = a
583 self.b = b
584 self.c = c
585
586 # these tests are for argument passing when used as a decorator
587 test = Test()
588 test.method(1, 2)
589 self.assertEqual(test.a, 1)
590 self.assertEqual(test.b, 2)
591 self.assertEqual(test.c, None)
592
593 test = Test()
594 test.method('a', 'b', 'c')
595 self.assertEqual(test.a, 'a')
596 self.assertEqual(test.b, 'b')
597 self.assertEqual(test.c, 'c')
598
599 test = Test()
600 test.method(a=1, b=2)
601 self.assertEqual(test.a, 1)
602 self.assertEqual(test.b, 2)
603
604
605 def test_typo_enter(self):
606 class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
607 def __unter__(self):
608 pass
609 def __exit__(self, *exc):
610 pass
611
612 with self.assertRaisesRegex(TypeError, 'the context manager'):
613 with mycontext():
614 pass
615
616
617 def test_typo_exit(self):
618 class ESC[4;38;5;81mmycontext(ESC[4;38;5;149mContextDecorator):
619 def __enter__(self):
620 pass
621 def __uxit__(self, *exc):
622 pass
623
624 with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'):
625 with mycontext():
626 pass
627
628
629 def test_contextdecorator_as_mixin(self):
630 class ESC[4;38;5;81msomecontext(ESC[4;38;5;149mobject):
631 started = False
632 exc = None
633
634 def __enter__(self):
635 self.started = True
636 return self
637
638 def __exit__(self, *exc):
639 self.exc = exc
640
641 class ESC[4;38;5;81mmycontext(ESC[4;38;5;149msomecontext, ESC[4;38;5;149mContextDecorator):
642 pass
643
644 context = mycontext()
645 @context
646 def test():
647 self.assertIsNone(context.exc)
648 self.assertTrue(context.started)
649 test()
650 self.assertEqual(context.exc, (None, None, None))
651
652
653 def test_contextmanager_as_decorator(self):
654 @contextmanager
655 def woohoo(y):
656 state.append(y)
657 yield
658 state.append(999)
659
660 state = []
661 @woohoo(1)
662 def test(x):
663 self.assertEqual(state, [1])
664 state.append(x)
665 test('something')
666 self.assertEqual(state, [1, 'something', 999])
667
668 # Issue #11647: Ensure the decorated function is 'reusable'
669 state = []
670 test('something else')
671 self.assertEqual(state, [1, 'something else', 999])
672
673
674 class ESC[4;38;5;81mTestBaseExitStack:
675 exit_stack = None
676
677 @support.requires_docstrings
678 def test_instance_docs(self):
679 # Issue 19330: ensure context manager instances have good docstrings
680 cm_docstring = self.exit_stack.__doc__
681 obj = self.exit_stack()
682 self.assertEqual(obj.__doc__, cm_docstring)
683
684 def test_no_resources(self):
685 with self.exit_stack():
686 pass
687
688 def test_callback(self):
689 expected = [
690 ((), {}),
691 ((1,), {}),
692 ((1,2), {}),
693 ((), dict(example=1)),
694 ((1,), dict(example=1)),
695 ((1,2), dict(example=1)),
696 ((1,2), dict(self=3, callback=4)),
697 ]
698 result = []
699 def _exit(*args, **kwds):
700 """Test metadata propagation"""
701 result.append((args, kwds))
702 with self.exit_stack() as stack:
703 for args, kwds in reversed(expected):
704 if args and kwds:
705 f = stack.callback(_exit, *args, **kwds)
706 elif args:
707 f = stack.callback(_exit, *args)
708 elif kwds:
709 f = stack.callback(_exit, **kwds)
710 else:
711 f = stack.callback(_exit)
712 self.assertIs(f, _exit)
713 for wrapper in stack._exit_callbacks:
714 self.assertIs(wrapper[1].__wrapped__, _exit)
715 self.assertNotEqual(wrapper[1].__name__, _exit.__name__)
716 self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
717 self.assertEqual(result, expected)
718
719 result = []
720 with self.exit_stack() as stack:
721 with self.assertRaises(TypeError):
722 stack.callback(arg=1)
723 with self.assertRaises(TypeError):
724 self.exit_stack.callback(arg=2)
725 with self.assertRaises(TypeError):
726 stack.callback(callback=_exit, arg=3)
727 self.assertEqual(result, [])
728
729 def test_push(self):
730 exc_raised = ZeroDivisionError
731 def _expect_exc(exc_type, exc, exc_tb):
732 self.assertIs(exc_type, exc_raised)
733 def _suppress_exc(*exc_details):
734 return True
735 def _expect_ok(exc_type, exc, exc_tb):
736 self.assertIsNone(exc_type)
737 self.assertIsNone(exc)
738 self.assertIsNone(exc_tb)
739 class ESC[4;38;5;81mExitCM(ESC[4;38;5;149mobject):
740 def __init__(self, check_exc):
741 self.check_exc = check_exc
742 def __enter__(self):
743 self.fail("Should not be called!")
744 def __exit__(self, *exc_details):
745 self.check_exc(*exc_details)
746 with self.exit_stack() as stack:
747 stack.push(_expect_ok)
748 self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)
749 cm = ExitCM(_expect_ok)
750 stack.push(cm)
751 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
752 stack.push(_suppress_exc)
753 self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)
754 cm = ExitCM(_expect_exc)
755 stack.push(cm)
756 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
757 stack.push(_expect_exc)
758 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
759 stack.push(_expect_exc)
760 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
761 1/0
762
763 def test_enter_context(self):
764 class ESC[4;38;5;81mTestCM(ESC[4;38;5;149mobject):
765 def __enter__(self):
766 result.append(1)
767 def __exit__(self, *exc_details):
768 result.append(3)
769
770 result = []
771 cm = TestCM()
772 with self.exit_stack() as stack:
773 @stack.callback # Registered first => cleaned up last
774 def _exit():
775 result.append(4)
776 self.assertIsNotNone(_exit)
777 stack.enter_context(cm)
778 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
779 result.append(2)
780 self.assertEqual(result, [1, 2, 3, 4])
781
782 def test_enter_context_errors(self):
783 class ESC[4;38;5;81mLacksEnterAndExit:
784 pass
785 class ESC[4;38;5;81mLacksEnter:
786 def __exit__(self, *exc_info):
787 pass
788 class ESC[4;38;5;81mLacksExit:
789 def __enter__(self):
790 pass
791
792 with self.exit_stack() as stack:
793 with self.assertRaisesRegex(TypeError, 'the context manager'):
794 stack.enter_context(LacksEnterAndExit())
795 with self.assertRaisesRegex(TypeError, 'the context manager'):
796 stack.enter_context(LacksEnter())
797 with self.assertRaisesRegex(TypeError, 'the context manager'):
798 stack.enter_context(LacksExit())
799 self.assertFalse(stack._exit_callbacks)
800
801 def test_close(self):
802 result = []
803 with self.exit_stack() as stack:
804 @stack.callback
805 def _exit():
806 result.append(1)
807 self.assertIsNotNone(_exit)
808 stack.close()
809 result.append(2)
810 self.assertEqual(result, [1, 2])
811
812 def test_pop_all(self):
813 result = []
814 with self.exit_stack() as stack:
815 @stack.callback
816 def _exit():
817 result.append(3)
818 self.assertIsNotNone(_exit)
819 new_stack = stack.pop_all()
820 result.append(1)
821 result.append(2)
822 new_stack.close()
823 self.assertEqual(result, [1, 2, 3])
824
825 def test_exit_raise(self):
826 with self.assertRaises(ZeroDivisionError):
827 with self.exit_stack() as stack:
828 stack.push(lambda *exc: False)
829 1/0
830
831 def test_exit_suppress(self):
832 with self.exit_stack() as stack:
833 stack.push(lambda *exc: True)
834 1/0
835
836 def test_exit_exception_traceback(self):
837 # This test captures the current behavior of ExitStack so that we know
838 # if we ever unintendedly change it. It is not a statement of what the
839 # desired behavior is (for instance, we may want to remove some of the
840 # internal contextlib frames).
841
842 def raise_exc(exc):
843 raise exc
844
845 try:
846 with self.exit_stack() as stack:
847 stack.callback(raise_exc, ValueError)
848 1/0
849 except ValueError as e:
850 exc = e
851
852 self.assertIsInstance(exc, ValueError)
853 ve_frames = traceback.extract_tb(exc.__traceback__)
854 expected = \
855 [('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \
856 self.callback_error_internal_frames + \
857 [('_exit_wrapper', 'callback(*args, **kwds)'),
858 ('raise_exc', 'raise exc')]
859
860 self.assertEqual(
861 [(f.name, f.line) for f in ve_frames], expected)
862
863 self.assertIsInstance(exc.__context__, ZeroDivisionError)
864 zde_frames = traceback.extract_tb(exc.__context__.__traceback__)
865 self.assertEqual([(f.name, f.line) for f in zde_frames],
866 [('test_exit_exception_traceback', '1/0')])
867
868 def test_exit_exception_chaining_reference(self):
869 # Sanity check to make sure that ExitStack chaining matches
870 # actual nested with statements
871 class ESC[4;38;5;81mRaiseExc:
872 def __init__(self, exc):
873 self.exc = exc
874 def __enter__(self):
875 return self
876 def __exit__(self, *exc_details):
877 raise self.exc
878
879 class ESC[4;38;5;81mRaiseExcWithContext:
880 def __init__(self, outer, inner):
881 self.outer = outer
882 self.inner = inner
883 def __enter__(self):
884 return self
885 def __exit__(self, *exc_details):
886 try:
887 raise self.inner
888 except:
889 raise self.outer
890
891 class ESC[4;38;5;81mSuppressExc:
892 def __enter__(self):
893 return self
894 def __exit__(self, *exc_details):
895 type(self).saved_details = exc_details
896 return True
897
898 try:
899 with RaiseExc(IndexError):
900 with RaiseExcWithContext(KeyError, AttributeError):
901 with SuppressExc():
902 with RaiseExc(ValueError):
903 1 / 0
904 except IndexError as exc:
905 self.assertIsInstance(exc.__context__, KeyError)
906 self.assertIsInstance(exc.__context__.__context__, AttributeError)
907 # Inner exceptions were suppressed
908 self.assertIsNone(exc.__context__.__context__.__context__)
909 else:
910 self.fail("Expected IndexError, but no exception was raised")
911 # Check the inner exceptions
912 inner_exc = SuppressExc.saved_details[1]
913 self.assertIsInstance(inner_exc, ValueError)
914 self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
915
916 def test_exit_exception_chaining(self):
917 # Ensure exception chaining matches the reference behaviour
918 def raise_exc(exc):
919 raise exc
920
921 saved_details = None
922 def suppress_exc(*exc_details):
923 nonlocal saved_details
924 saved_details = exc_details
925 return True
926
927 try:
928 with self.exit_stack() as stack:
929 stack.callback(raise_exc, IndexError)
930 stack.callback(raise_exc, KeyError)
931 stack.callback(raise_exc, AttributeError)
932 stack.push(suppress_exc)
933 stack.callback(raise_exc, ValueError)
934 1 / 0
935 except IndexError as exc:
936 self.assertIsInstance(exc.__context__, KeyError)
937 self.assertIsInstance(exc.__context__.__context__, AttributeError)
938 # Inner exceptions were suppressed
939 self.assertIsNone(exc.__context__.__context__.__context__)
940 else:
941 self.fail("Expected IndexError, but no exception was raised")
942 # Check the inner exceptions
943 inner_exc = saved_details[1]
944 self.assertIsInstance(inner_exc, ValueError)
945 self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
946
947 def test_exit_exception_explicit_none_context(self):
948 # Ensure ExitStack chaining matches actual nested `with` statements
949 # regarding explicit __context__ = None.
950
951 class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
952 pass
953
954 @contextmanager
955 def my_cm():
956 try:
957 yield
958 except BaseException:
959 exc = MyException()
960 try:
961 raise exc
962 finally:
963 exc.__context__ = None
964
965 @contextmanager
966 def my_cm_with_exit_stack():
967 with self.exit_stack() as stack:
968 stack.enter_context(my_cm())
969 yield stack
970
971 for cm in (my_cm, my_cm_with_exit_stack):
972 with self.subTest():
973 try:
974 with cm():
975 raise IndexError()
976 except MyException as exc:
977 self.assertIsNone(exc.__context__)
978 else:
979 self.fail("Expected IndexError, but no exception was raised")
980
981 def test_exit_exception_non_suppressing(self):
982 # http://bugs.python.org/issue19092
983 def raise_exc(exc):
984 raise exc
985
986 def suppress_exc(*exc_details):
987 return True
988
989 try:
990 with self.exit_stack() as stack:
991 stack.callback(lambda: None)
992 stack.callback(raise_exc, IndexError)
993 except Exception as exc:
994 self.assertIsInstance(exc, IndexError)
995 else:
996 self.fail("Expected IndexError, but no exception was raised")
997
998 try:
999 with self.exit_stack() as stack:
1000 stack.callback(raise_exc, KeyError)
1001 stack.push(suppress_exc)
1002 stack.callback(raise_exc, IndexError)
1003 except Exception as exc:
1004 self.assertIsInstance(exc, KeyError)
1005 else:
1006 self.fail("Expected KeyError, but no exception was raised")
1007
1008 def test_exit_exception_with_correct_context(self):
1009 # http://bugs.python.org/issue20317
1010 @contextmanager
1011 def gets_the_context_right(exc):
1012 try:
1013 yield
1014 finally:
1015 raise exc
1016
1017 exc1 = Exception(1)
1018 exc2 = Exception(2)
1019 exc3 = Exception(3)
1020 exc4 = Exception(4)
1021
1022 # The contextmanager already fixes the context, so prior to the
1023 # fix, ExitStack would try to fix it *again* and get into an
1024 # infinite self-referential loop
1025 try:
1026 with self.exit_stack() as stack:
1027 stack.enter_context(gets_the_context_right(exc4))
1028 stack.enter_context(gets_the_context_right(exc3))
1029 stack.enter_context(gets_the_context_right(exc2))
1030 raise exc1
1031 except Exception as exc:
1032 self.assertIs(exc, exc4)
1033 self.assertIs(exc.__context__, exc3)
1034 self.assertIs(exc.__context__.__context__, exc2)
1035 self.assertIs(exc.__context__.__context__.__context__, exc1)
1036 self.assertIsNone(
1037 exc.__context__.__context__.__context__.__context__)
1038
1039 def test_exit_exception_with_existing_context(self):
1040 # Addresses a lack of test coverage discovered after checking in a
1041 # fix for issue 20317 that still contained debugging code.
1042 def raise_nested(inner_exc, outer_exc):
1043 try:
1044 raise inner_exc
1045 finally:
1046 raise outer_exc
1047 exc1 = Exception(1)
1048 exc2 = Exception(2)
1049 exc3 = Exception(3)
1050 exc4 = Exception(4)
1051 exc5 = Exception(5)
1052 try:
1053 with self.exit_stack() as stack:
1054 stack.callback(raise_nested, exc4, exc5)
1055 stack.callback(raise_nested, exc2, exc3)
1056 raise exc1
1057 except Exception as exc:
1058 self.assertIs(exc, exc5)
1059 self.assertIs(exc.__context__, exc4)
1060 self.assertIs(exc.__context__.__context__, exc3)
1061 self.assertIs(exc.__context__.__context__.__context__, exc2)
1062 self.assertIs(
1063 exc.__context__.__context__.__context__.__context__, exc1)
1064 self.assertIsNone(
1065 exc.__context__.__context__.__context__.__context__.__context__)
1066
1067 def test_body_exception_suppress(self):
1068 def suppress_exc(*exc_details):
1069 return True
1070 try:
1071 with self.exit_stack() as stack:
1072 stack.push(suppress_exc)
1073 1/0
1074 except IndexError as exc:
1075 self.fail("Expected no exception, got IndexError")
1076
1077 def test_exit_exception_chaining_suppress(self):
1078 with self.exit_stack() as stack:
1079 stack.push(lambda *exc: True)
1080 stack.push(lambda *exc: 1/0)
1081 stack.push(lambda *exc: {}[1])
1082
1083 def test_excessive_nesting(self):
1084 # The original implementation would die with RecursionError here
1085 with self.exit_stack() as stack:
1086 for i in range(10000):
1087 stack.callback(int)
1088
1089 def test_instance_bypass(self):
1090 class ESC[4;38;5;81mExample(ESC[4;38;5;149mobject): pass
1091 cm = Example()
1092 cm.__enter__ = object()
1093 cm.__exit__ = object()
1094 stack = self.exit_stack()
1095 with self.assertRaisesRegex(TypeError, 'the context manager'):
1096 stack.enter_context(cm)
1097 stack.push(cm)
1098 self.assertIs(stack._exit_callbacks[-1][1], cm)
1099
1100 def test_dont_reraise_RuntimeError(self):
1101 # https://bugs.python.org/issue27122
1102 class ESC[4;38;5;81mUniqueException(ESC[4;38;5;149mException): pass
1103 class ESC[4;38;5;81mUniqueRuntimeError(ESC[4;38;5;149mRuntimeError): pass
1104
1105 @contextmanager
1106 def second():
1107 try:
1108 yield 1
1109 except Exception as exc:
1110 raise UniqueException("new exception") from exc
1111
1112 @contextmanager
1113 def first():
1114 try:
1115 yield 1
1116 except Exception as exc:
1117 raise exc
1118
1119 # The UniqueRuntimeError should be caught by second()'s exception
1120 # handler which chain raised a new UniqueException.
1121 with self.assertRaises(UniqueException) as err_ctx:
1122 with self.exit_stack() as es_ctx:
1123 es_ctx.enter_context(second())
1124 es_ctx.enter_context(first())
1125 raise UniqueRuntimeError("please no infinite loop.")
1126
1127 exc = err_ctx.exception
1128 self.assertIsInstance(exc, UniqueException)
1129 self.assertIsInstance(exc.__context__, UniqueRuntimeError)
1130 self.assertIsNone(exc.__context__.__context__)
1131 self.assertIsNone(exc.__context__.__cause__)
1132 self.assertIs(exc.__cause__, exc.__context__)
1133
1134
1135 class ESC[4;38;5;81mTestExitStack(ESC[4;38;5;149mTestBaseExitStack, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1136 exit_stack = ExitStack
1137 callback_error_internal_frames = [
1138 ('__exit__', 'raise exc_details[1]'),
1139 ('__exit__', 'if cb(*exc_details):'),
1140 ]
1141
1142
1143 class ESC[4;38;5;81mTestRedirectStream:
1144
1145 redirect_stream = None
1146 orig_stream = None
1147
1148 @support.requires_docstrings
1149 def test_instance_docs(self):
1150 # Issue 19330: ensure context manager instances have good docstrings
1151 cm_docstring = self.redirect_stream.__doc__
1152 obj = self.redirect_stream(None)
1153 self.assertEqual(obj.__doc__, cm_docstring)
1154
1155 def test_no_redirect_in_init(self):
1156 orig_stdout = getattr(sys, self.orig_stream)
1157 self.redirect_stream(None)
1158 self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
1159
1160 def test_redirect_to_string_io(self):
1161 f = io.StringIO()
1162 msg = "Consider an API like help(), which prints directly to stdout"
1163 orig_stdout = getattr(sys, self.orig_stream)
1164 with self.redirect_stream(f):
1165 print(msg, file=getattr(sys, self.orig_stream))
1166 self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
1167 s = f.getvalue().strip()
1168 self.assertEqual(s, msg)
1169
1170 def test_enter_result_is_target(self):
1171 f = io.StringIO()
1172 with self.redirect_stream(f) as enter_result:
1173 self.assertIs(enter_result, f)
1174
1175 def test_cm_is_reusable(self):
1176 f = io.StringIO()
1177 write_to_f = self.redirect_stream(f)
1178 orig_stdout = getattr(sys, self.orig_stream)
1179 with write_to_f:
1180 print("Hello", end=" ", file=getattr(sys, self.orig_stream))
1181 with write_to_f:
1182 print("World!", file=getattr(sys, self.orig_stream))
1183 self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
1184 s = f.getvalue()
1185 self.assertEqual(s, "Hello World!\n")
1186
1187 def test_cm_is_reentrant(self):
1188 f = io.StringIO()
1189 write_to_f = self.redirect_stream(f)
1190 orig_stdout = getattr(sys, self.orig_stream)
1191 with write_to_f:
1192 print("Hello", end=" ", file=getattr(sys, self.orig_stream))
1193 with write_to_f:
1194 print("World!", file=getattr(sys, self.orig_stream))
1195 self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
1196 s = f.getvalue()
1197 self.assertEqual(s, "Hello World!\n")
1198
1199
1200 class ESC[4;38;5;81mTestRedirectStdout(ESC[4;38;5;149mTestRedirectStream, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1201
1202 redirect_stream = redirect_stdout
1203 orig_stream = "stdout"
1204
1205
1206 class ESC[4;38;5;81mTestRedirectStderr(ESC[4;38;5;149mTestRedirectStream, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1207
1208 redirect_stream = redirect_stderr
1209 orig_stream = "stderr"
1210
1211
1212 class ESC[4;38;5;81mTestSuppress(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1213
1214 @support.requires_docstrings
1215 def test_instance_docs(self):
1216 # Issue 19330: ensure context manager instances have good docstrings
1217 cm_docstring = suppress.__doc__
1218 obj = suppress()
1219 self.assertEqual(obj.__doc__, cm_docstring)
1220
1221 def test_no_result_from_enter(self):
1222 with suppress(ValueError) as enter_result:
1223 self.assertIsNone(enter_result)
1224
1225 def test_no_exception(self):
1226 with suppress(ValueError):
1227 self.assertEqual(pow(2, 5), 32)
1228
1229 def test_exact_exception(self):
1230 with suppress(TypeError):
1231 len(5)
1232
1233 def test_exception_hierarchy(self):
1234 with suppress(LookupError):
1235 'Hello'[50]
1236
1237 def test_other_exception(self):
1238 with self.assertRaises(ZeroDivisionError):
1239 with suppress(TypeError):
1240 1/0
1241
1242 def test_no_args(self):
1243 with self.assertRaises(ZeroDivisionError):
1244 with suppress():
1245 1/0
1246
1247 def test_multiple_exception_args(self):
1248 with suppress(ZeroDivisionError, TypeError):
1249 1/0
1250 with suppress(ZeroDivisionError, TypeError):
1251 len(5)
1252
1253 def test_cm_is_reentrant(self):
1254 ignore_exceptions = suppress(Exception)
1255 with ignore_exceptions:
1256 pass
1257 with ignore_exceptions:
1258 len(5)
1259 with ignore_exceptions:
1260 with ignore_exceptions: # Check nested usage
1261 len(5)
1262 outer_continued = True
1263 1/0
1264 self.assertTrue(outer_continued)
1265
1266
1267 class ESC[4;38;5;81mTestChdir(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1268 def make_relative_path(self, *parts):
1269 return os.path.join(
1270 os.path.dirname(os.path.realpath(__file__)),
1271 *parts,
1272 )
1273
1274 def test_simple(self):
1275 old_cwd = os.getcwd()
1276 target = self.make_relative_path('data')
1277 self.assertNotEqual(old_cwd, target)
1278
1279 with chdir(target):
1280 self.assertEqual(os.getcwd(), target)
1281 self.assertEqual(os.getcwd(), old_cwd)
1282
1283 def test_reentrant(self):
1284 old_cwd = os.getcwd()
1285 target1 = self.make_relative_path('data')
1286 target2 = self.make_relative_path('ziptestdata')
1287 self.assertNotIn(old_cwd, (target1, target2))
1288 chdir1, chdir2 = chdir(target1), chdir(target2)
1289
1290 with chdir1:
1291 self.assertEqual(os.getcwd(), target1)
1292 with chdir2:
1293 self.assertEqual(os.getcwd(), target2)
1294 with chdir1:
1295 self.assertEqual(os.getcwd(), target1)
1296 self.assertEqual(os.getcwd(), target2)
1297 self.assertEqual(os.getcwd(), target1)
1298 self.assertEqual(os.getcwd(), old_cwd)
1299
1300 def test_exception(self):
1301 old_cwd = os.getcwd()
1302 target = self.make_relative_path('data')
1303 self.assertNotEqual(old_cwd, target)
1304
1305 try:
1306 with chdir(target):
1307 self.assertEqual(os.getcwd(), target)
1308 raise RuntimeError("boom")
1309 except RuntimeError as re:
1310 self.assertEqual(str(re), "boom")
1311 self.assertEqual(os.getcwd(), old_cwd)
1312
1313
1314 if __name__ == "__main__":
1315 unittest.main()