python (3.11.7)

(root)/
lib/
python3.11/
unittest/
test/
test_async_case.py
       1  import asyncio
       2  import contextvars
       3  import unittest
       4  from test import support
       5  
       6  support.requires_working_socket(module=True)
       7  
       8  
       9  class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException):
      10      pass
      11  
      12  
      13  def tearDownModule():
      14      asyncio.set_event_loop_policy(None)
      15  
      16  
      17  class ESC[4;38;5;81mTestCM:
      18      def __init__(self, ordering, enter_result=None):
      19          self.ordering = ordering
      20          self.enter_result = enter_result
      21  
      22      async def __aenter__(self):
      23          self.ordering.append('enter')
      24          return self.enter_result
      25  
      26      async def __aexit__(self, *exc_info):
      27          self.ordering.append('exit')
      28  
      29  
      30  class ESC[4;38;5;81mLacksEnterAndExit:
      31      pass
      32  class ESC[4;38;5;81mLacksEnter:
      33      async def __aexit__(self, *exc_info):
      34          pass
      35  class ESC[4;38;5;81mLacksExit:
      36      async def __aenter__(self):
      37          pass
      38  
      39  
      40  VAR = contextvars.ContextVar('VAR', default=())
      41  
      42  
      43  class ESC[4;38;5;81mTestAsyncCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      44      maxDiff = None
      45  
      46      def setUp(self):
      47          # Ensure that IsolatedAsyncioTestCase instances are destroyed before
      48          # starting a new event loop
      49          self.addCleanup(support.gc_collect)
      50  
      51      def test_full_cycle(self):
      52          expected = ['setUp',
      53                      'asyncSetUp',
      54                      'test',
      55                      'asyncTearDown',
      56                      'tearDown',
      57                      'cleanup6',
      58                      'cleanup5',
      59                      'cleanup4',
      60                      'cleanup3',
      61                      'cleanup2',
      62                      'cleanup1']
      63          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      64              def setUp(self):
      65                  self.assertEqual(events, [])
      66                  events.append('setUp')
      67                  VAR.set(VAR.get() + ('setUp',))
      68                  self.addCleanup(self.on_cleanup1)
      69                  self.addAsyncCleanup(self.on_cleanup2)
      70  
      71              async def asyncSetUp(self):
      72                  self.assertEqual(events, expected[:1])
      73                  events.append('asyncSetUp')
      74                  VAR.set(VAR.get() + ('asyncSetUp',))
      75                  self.addCleanup(self.on_cleanup3)
      76                  self.addAsyncCleanup(self.on_cleanup4)
      77  
      78              async def test_func(self):
      79                  self.assertEqual(events, expected[:2])
      80                  events.append('test')
      81                  VAR.set(VAR.get() + ('test',))
      82                  self.addCleanup(self.on_cleanup5)
      83                  self.addAsyncCleanup(self.on_cleanup6)
      84  
      85              async def asyncTearDown(self):
      86                  self.assertEqual(events, expected[:3])
      87                  VAR.set(VAR.get() + ('asyncTearDown',))
      88                  events.append('asyncTearDown')
      89  
      90              def tearDown(self):
      91                  self.assertEqual(events, expected[:4])
      92                  events.append('tearDown')
      93                  VAR.set(VAR.get() + ('tearDown',))
      94  
      95              def on_cleanup1(self):
      96                  self.assertEqual(events, expected[:10])
      97                  events.append('cleanup1')
      98                  VAR.set(VAR.get() + ('cleanup1',))
      99                  nonlocal cvar
     100                  cvar = VAR.get()
     101  
     102              async def on_cleanup2(self):
     103                  self.assertEqual(events, expected[:9])
     104                  events.append('cleanup2')
     105                  VAR.set(VAR.get() + ('cleanup2',))
     106  
     107              def on_cleanup3(self):
     108                  self.assertEqual(events, expected[:8])
     109                  events.append('cleanup3')
     110                  VAR.set(VAR.get() + ('cleanup3',))
     111  
     112              async def on_cleanup4(self):
     113                  self.assertEqual(events, expected[:7])
     114                  events.append('cleanup4')
     115                  VAR.set(VAR.get() + ('cleanup4',))
     116  
     117              def on_cleanup5(self):
     118                  self.assertEqual(events, expected[:6])
     119                  events.append('cleanup5')
     120                  VAR.set(VAR.get() + ('cleanup5',))
     121  
     122              async def on_cleanup6(self):
     123                  self.assertEqual(events, expected[:5])
     124                  events.append('cleanup6')
     125                  VAR.set(VAR.get() + ('cleanup6',))
     126  
     127          events = []
     128          cvar = ()
     129          test = Test("test_func")
     130          result = test.run()
     131          self.assertEqual(result.errors, [])
     132          self.assertEqual(result.failures, [])
     133          self.assertEqual(events, expected)
     134          self.assertEqual(cvar, tuple(expected))
     135  
     136          events = []
     137          cvar = ()
     138          test = Test("test_func")
     139          test.debug()
     140          self.assertEqual(events, expected)
     141          self.assertEqual(cvar, tuple(expected))
     142          test.doCleanups()
     143          self.assertEqual(events, expected)
     144          self.assertEqual(cvar, tuple(expected))
     145  
     146      def test_exception_in_setup(self):
     147          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     148              async def asyncSetUp(self):
     149                  events.append('asyncSetUp')
     150                  self.addAsyncCleanup(self.on_cleanup)
     151                  raise MyException()
     152  
     153              async def test_func(self):
     154                  events.append('test')
     155  
     156              async def asyncTearDown(self):
     157                  events.append('asyncTearDown')
     158  
     159              async def on_cleanup(self):
     160                  events.append('cleanup')
     161  
     162  
     163          events = []
     164          test = Test("test_func")
     165          result = test.run()
     166          self.assertEqual(events, ['asyncSetUp', 'cleanup'])
     167          self.assertIs(result.errors[0][0], test)
     168          self.assertIn('MyException', result.errors[0][1])
     169  
     170          events = []
     171          test = Test("test_func")
     172          self.addCleanup(test._tearDownAsyncioRunner)
     173          try:
     174              test.debug()
     175          except MyException:
     176              pass
     177          else:
     178              self.fail('Expected a MyException exception')
     179          self.assertEqual(events, ['asyncSetUp'])
     180          test.doCleanups()
     181          self.assertEqual(events, ['asyncSetUp', 'cleanup'])
     182  
     183      def test_exception_in_test(self):
     184          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     185              async def asyncSetUp(self):
     186                  events.append('asyncSetUp')
     187  
     188              async def test_func(self):
     189                  events.append('test')
     190                  self.addAsyncCleanup(self.on_cleanup)
     191                  raise MyException()
     192  
     193              async def asyncTearDown(self):
     194                  events.append('asyncTearDown')
     195  
     196              async def on_cleanup(self):
     197                  events.append('cleanup')
     198  
     199          events = []
     200          test = Test("test_func")
     201          result = test.run()
     202          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
     203          self.assertIs(result.errors[0][0], test)
     204          self.assertIn('MyException', result.errors[0][1])
     205  
     206          events = []
     207          test = Test("test_func")
     208          self.addCleanup(test._tearDownAsyncioRunner)
     209          try:
     210              test.debug()
     211          except MyException:
     212              pass
     213          else:
     214              self.fail('Expected a MyException exception')
     215          self.assertEqual(events, ['asyncSetUp', 'test'])
     216          test.doCleanups()
     217          self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
     218  
     219      def test_exception_in_tear_down(self):
     220          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     221              async def asyncSetUp(self):
     222                  events.append('asyncSetUp')
     223  
     224              async def test_func(self):
     225                  events.append('test')
     226                  self.addAsyncCleanup(self.on_cleanup)
     227  
     228              async def asyncTearDown(self):
     229                  events.append('asyncTearDown')
     230                  raise MyException()
     231  
     232              async def on_cleanup(self):
     233                  events.append('cleanup')
     234  
     235          events = []
     236          test = Test("test_func")
     237          result = test.run()
     238          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
     239          self.assertIs(result.errors[0][0], test)
     240          self.assertIn('MyException', result.errors[0][1])
     241  
     242          events = []
     243          test = Test("test_func")
     244          self.addCleanup(test._tearDownAsyncioRunner)
     245          try:
     246              test.debug()
     247          except MyException:
     248              pass
     249          else:
     250              self.fail('Expected a MyException exception')
     251          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
     252          test.doCleanups()
     253          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
     254  
     255      def test_exception_in_tear_clean_up(self):
     256          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     257              async def asyncSetUp(self):
     258                  events.append('asyncSetUp')
     259  
     260              async def test_func(self):
     261                  events.append('test')
     262                  self.addAsyncCleanup(self.on_cleanup1)
     263                  self.addAsyncCleanup(self.on_cleanup2)
     264  
     265              async def asyncTearDown(self):
     266                  events.append('asyncTearDown')
     267  
     268              async def on_cleanup1(self):
     269                  events.append('cleanup1')
     270                  raise MyException('some error')
     271  
     272              async def on_cleanup2(self):
     273                  events.append('cleanup2')
     274                  raise MyException('other error')
     275  
     276          events = []
     277          test = Test("test_func")
     278          result = test.run()
     279          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
     280          self.assertIs(result.errors[0][0], test)
     281          self.assertIn('MyException: other error', result.errors[0][1])
     282          self.assertIn('MyException: some error', result.errors[1][1])
     283  
     284          events = []
     285          test = Test("test_func")
     286          self.addCleanup(test._tearDownAsyncioRunner)
     287          try:
     288              test.debug()
     289          except MyException:
     290              pass
     291          else:
     292              self.fail('Expected a MyException exception')
     293          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
     294          test.doCleanups()
     295          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
     296  
     297      def test_deprecation_of_return_val_from_test(self):
     298          # Issue 41322 - deprecate return of value that is not None from a test
     299          class ESC[4;38;5;81mNothing:
     300              def __eq__(self, o):
     301                  return o is None
     302          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     303              async def test1(self):
     304                  return 1
     305              async def test2(self):
     306                  yield 1
     307              async def test3(self):
     308                  return Nothing()
     309  
     310          with self.assertWarns(DeprecationWarning) as w:
     311              Test('test1').run()
     312          self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
     313          self.assertIn('test1', str(w.warning))
     314          self.assertEqual(w.filename, __file__)
     315  
     316          with self.assertWarns(DeprecationWarning) as w:
     317              Test('test2').run()
     318          self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
     319          self.assertIn('test2', str(w.warning))
     320          self.assertEqual(w.filename, __file__)
     321  
     322          with self.assertWarns(DeprecationWarning) as w:
     323              Test('test3').run()
     324          self.assertIn('It is deprecated to return a value that is not None', str(w.warning))
     325          self.assertIn('test3', str(w.warning))
     326          self.assertEqual(w.filename, __file__)
     327  
     328      def test_cleanups_interleave_order(self):
     329          events = []
     330  
     331          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     332              async def test_func(self):
     333                  self.addAsyncCleanup(self.on_sync_cleanup, 1)
     334                  self.addAsyncCleanup(self.on_async_cleanup, 2)
     335                  self.addAsyncCleanup(self.on_sync_cleanup, 3)
     336                  self.addAsyncCleanup(self.on_async_cleanup, 4)
     337  
     338              async def on_sync_cleanup(self, val):
     339                  events.append(f'sync_cleanup {val}')
     340  
     341              async def on_async_cleanup(self, val):
     342                  events.append(f'async_cleanup {val}')
     343  
     344          test = Test("test_func")
     345          test.run()
     346          self.assertEqual(events, ['async_cleanup 4',
     347                                    'sync_cleanup 3',
     348                                    'async_cleanup 2',
     349                                    'sync_cleanup 1'])
     350  
     351      def test_base_exception_from_async_method(self):
     352          events = []
     353          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     354              async def test_base(self):
     355                  events.append("test_base")
     356                  raise BaseException()
     357                  events.append("not it")
     358  
     359              async def test_no_err(self):
     360                  events.append("test_no_err")
     361  
     362              async def test_cancel(self):
     363                  raise asyncio.CancelledError()
     364  
     365          test = Test("test_base")
     366          output = test.run()
     367          self.assertFalse(output.wasSuccessful())
     368  
     369          test = Test("test_no_err")
     370          test.run()
     371          self.assertEqual(events, ['test_base', 'test_no_err'])
     372  
     373          test = Test("test_cancel")
     374          output = test.run()
     375          self.assertFalse(output.wasSuccessful())
     376  
     377      def test_cancellation_hanging_tasks(self):
     378          cancelled = False
     379          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     380              async def test_leaking_task(self):
     381                  async def coro():
     382                      nonlocal cancelled
     383                      try:
     384                          await asyncio.sleep(1)
     385                      except asyncio.CancelledError:
     386                          cancelled = True
     387                          raise
     388  
     389                  # Leave this running in the background
     390                  asyncio.create_task(coro())
     391  
     392          test = Test("test_leaking_task")
     393          output = test.run()
     394          self.assertTrue(cancelled)
     395  
     396      def test_enterAsyncContext(self):
     397          events = []
     398  
     399          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     400              async def test_func(slf):
     401                  slf.addAsyncCleanup(events.append, 'cleanup1')
     402                  cm = TestCM(events, 42)
     403                  self.assertEqual(await slf.enterAsyncContext(cm), 42)
     404                  slf.addAsyncCleanup(events.append, 'cleanup2')
     405                  events.append('test')
     406  
     407          test = Test('test_func')
     408          output = test.run()
     409          self.assertTrue(output.wasSuccessful(), output)
     410          self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1'])
     411  
     412      def test_enterAsyncContext_arg_errors(self):
     413          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     414              async def test_func(slf):
     415                  with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     416                      await slf.enterAsyncContext(LacksEnterAndExit())
     417                  with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     418                      await slf.enterAsyncContext(LacksEnter())
     419                  with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
     420                      await slf.enterAsyncContext(LacksExit())
     421  
     422          test = Test('test_func')
     423          output = test.run()
     424          self.assertTrue(output.wasSuccessful())
     425  
     426      def test_debug_cleanup_same_loop(self):
     427          class ESC[4;38;5;81mTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     428              async def asyncSetUp(self):
     429                  async def coro():
     430                      await asyncio.sleep(0)
     431                  fut = asyncio.ensure_future(coro())
     432                  self.addAsyncCleanup(self.cleanup, fut)
     433                  events.append('asyncSetUp')
     434  
     435              async def test_func(self):
     436                  events.append('test')
     437                  raise MyException()
     438  
     439              async def asyncTearDown(self):
     440                  events.append('asyncTearDown')
     441  
     442              async def cleanup(self, fut):
     443                  try:
     444                      # Raises an exception if in different loop
     445                      await asyncio.wait([fut])
     446                      events.append('cleanup')
     447                  except:
     448                      import traceback
     449                      traceback.print_exc()
     450                      raise
     451  
     452          events = []
     453          test = Test("test_func")
     454          result = test.run()
     455          self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
     456          self.assertIn('MyException', result.errors[0][1])
     457  
     458          events = []
     459          test = Test("test_func")
     460          self.addCleanup(test._tearDownAsyncioRunner)
     461          try:
     462              test.debug()
     463          except MyException:
     464              pass
     465          else:
     466              self.fail('Expected a MyException exception')
     467          self.assertEqual(events, ['asyncSetUp', 'test'])
     468          test.doCleanups()
     469          self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
     470  
     471      def test_setup_get_event_loop(self):
     472          # See https://github.com/python/cpython/issues/95736
     473          # Make sure the default event loop is not used
     474          asyncio.set_event_loop(None)
     475  
     476          class ESC[4;38;5;81mTestCase1(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     477              def setUp(self):
     478                  asyncio.get_event_loop_policy().get_event_loop()
     479  
     480              async def test_demo1(self):
     481                  pass
     482  
     483          test = TestCase1('test_demo1')
     484          result = test.run()
     485          self.assertTrue(result.wasSuccessful())
     486  
     487  if __name__ == "__main__":
     488      unittest.main()