(root)/
Python-3.11.7/
Lib/
test/
test_audit.py
       1  """Tests for sys.audit and sys.addaudithook
       2  """
       3  
       4  import subprocess
       5  import sys
       6  import unittest
       7  from test import support
       8  from test.support import import_helper
       9  from test.support import os_helper
      10  
      11  
      12  if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
      13      raise unittest.SkipTest("test only relevant when sys.audit is available")
      14  
      15  AUDIT_TESTS_PY = support.findfile("audit-tests.py")
      16  
      17  
      18  class ESC[4;38;5;81mAuditTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      19      maxDiff = None
      20  
      21      @support.requires_subprocess()
      22      def do_test(self, *args):
      23          with subprocess.Popen(
      24              [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
      25              encoding="utf-8",
      26              stdout=subprocess.PIPE,
      27              stderr=subprocess.PIPE,
      28          ) as p:
      29              p.wait()
      30              sys.stdout.writelines(p.stdout)
      31              sys.stderr.writelines(p.stderr)
      32              if p.returncode:
      33                  self.fail("".join(p.stderr))
      34  
      35      @support.requires_subprocess()
      36      def run_python(self, *args):
      37          events = []
      38          with subprocess.Popen(
      39              [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
      40              encoding="utf-8",
      41              stdout=subprocess.PIPE,
      42              stderr=subprocess.PIPE,
      43          ) as p:
      44              p.wait()
      45              sys.stderr.writelines(p.stderr)
      46              return (
      47                  p.returncode,
      48                  [line.strip().partition(" ") for line in p.stdout],
      49                  "".join(p.stderr),
      50              )
      51  
      52      def test_basic(self):
      53          self.do_test("test_basic")
      54  
      55      def test_block_add_hook(self):
      56          self.do_test("test_block_add_hook")
      57  
      58      def test_block_add_hook_baseexception(self):
      59          self.do_test("test_block_add_hook_baseexception")
      60  
      61      def test_marshal(self):
      62          import_helper.import_module("marshal")
      63  
      64          self.do_test("test_marshal")
      65  
      66      def test_pickle(self):
      67          import_helper.import_module("pickle")
      68  
      69          self.do_test("test_pickle")
      70  
      71      def test_monkeypatch(self):
      72          self.do_test("test_monkeypatch")
      73  
      74      def test_open(self):
      75          self.do_test("test_open", os_helper.TESTFN)
      76  
      77      def test_cantrace(self):
      78          self.do_test("test_cantrace")
      79  
      80      def test_mmap(self):
      81          self.do_test("test_mmap")
      82  
      83      def test_excepthook(self):
      84          returncode, events, stderr = self.run_python("test_excepthook")
      85          if not returncode:
      86              self.fail(f"Expected fatal exception\n{stderr}")
      87  
      88          self.assertSequenceEqual(
      89              [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
      90          )
      91  
      92      def test_unraisablehook(self):
      93          returncode, events, stderr = self.run_python("test_unraisablehook")
      94          if returncode:
      95              self.fail(stderr)
      96  
      97          self.assertEqual(events[0][0], "sys.unraisablehook")
      98          self.assertEqual(
      99              events[0][2],
     100              "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
     101          )
     102  
     103      def test_winreg(self):
     104          import_helper.import_module("winreg")
     105          returncode, events, stderr = self.run_python("test_winreg")
     106          if returncode:
     107              self.fail(stderr)
     108  
     109          self.assertEqual(events[0][0], "winreg.OpenKey")
     110          self.assertEqual(events[1][0], "winreg.OpenKey/result")
     111          expected = events[1][2]
     112          self.assertTrue(expected)
     113          self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
     114          self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
     115          self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
     116  
     117      def test_socket(self):
     118          import_helper.import_module("socket")
     119          returncode, events, stderr = self.run_python("test_socket")
     120          if returncode:
     121              self.fail(stderr)
     122  
     123          if support.verbose:
     124              print(*events, sep='\n')
     125          self.assertEqual(events[0][0], "socket.gethostname")
     126          self.assertEqual(events[1][0], "socket.__new__")
     127          self.assertEqual(events[2][0], "socket.bind")
     128          self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
     129  
     130      def test_gc(self):
     131          returncode, events, stderr = self.run_python("test_gc")
     132          if returncode:
     133              self.fail(stderr)
     134  
     135          if support.verbose:
     136              print(*events, sep='\n')
     137          self.assertEqual(
     138              [event[0] for event in events],
     139              ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
     140          )
     141  
     142  
     143      def test_http(self):
     144          import_helper.import_module("http.client")
     145          returncode, events, stderr = self.run_python("test_http_client")
     146          if returncode:
     147              self.fail(stderr)
     148  
     149          if support.verbose:
     150              print(*events, sep='\n')
     151          self.assertEqual(events[0][0], "http.client.connect")
     152          self.assertEqual(events[0][2], "www.python.org 80")
     153          self.assertEqual(events[1][0], "http.client.send")
     154          if events[1][2] != '[cannot send]':
     155              self.assertIn('HTTP', events[1][2])
     156  
     157  
     158      def test_sqlite3(self):
     159          sqlite3 = import_helper.import_module("sqlite3")
     160          returncode, events, stderr = self.run_python("test_sqlite3")
     161          if returncode:
     162              self.fail(stderr)
     163  
     164          if support.verbose:
     165              print(*events, sep='\n')
     166          actual = [ev[0] for ev in events]
     167          expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
     168  
     169          if hasattr(sqlite3.Connection, "enable_load_extension"):
     170              expected += [
     171                  "sqlite3.enable_load_extension",
     172                  "sqlite3.load_extension",
     173              ]
     174          self.assertEqual(actual, expected)
     175  
     176  
     177      def test_sys_getframe(self):
     178          returncode, events, stderr = self.run_python("test_sys_getframe")
     179          if returncode:
     180              self.fail(stderr)
     181  
     182          if support.verbose:
     183              print(*events, sep='\n')
     184          actual = [(ev[0], ev[2]) for ev in events]
     185          expected = [("sys._getframe", "test_sys_getframe")]
     186  
     187          self.assertEqual(actual, expected)
     188  
     189      def test_syslog(self):
     190          syslog = import_helper.import_module("syslog")
     191  
     192          returncode, events, stderr = self.run_python("test_syslog")
     193          if returncode:
     194              self.fail(stderr)
     195  
     196          if support.verbose:
     197              print('Events:', *events, sep='\n  ')
     198  
     199          self.assertSequenceEqual(
     200              events,
     201              [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
     202              ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
     203              ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
     204              ('syslog.closelog', '', ''),
     205              ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
     206              ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
     207              ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
     208              ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
     209              ('syslog.closelog', '', '')]
     210          )
     211  
     212      def test_not_in_gc(self):
     213          returncode, _, stderr = self.run_python("test_not_in_gc")
     214          if returncode:
     215              self.fail(stderr)
     216  
     217  
     218  if __name__ == "__main__":
     219      unittest.main()