(root)/
Python-3.12.0/
Lib/
test/
test_perf_profiler.py
       1  import unittest
       2  import string
       3  import subprocess
       4  import sys
       5  import sysconfig
       6  import os
       7  import pathlib
       8  from test import support
       9  from test.support.script_helper import (
      10      make_script,
      11      assert_python_failure,
      12      assert_python_ok,
      13  )
      14  from test.support.os_helper import temp_dir
      15  
      16  
      17  if not support.has_subprocess_support:
      18      raise unittest.SkipTest("test module requires subprocess")
      19  
      20  
      21  def supports_trampoline_profiling():
      22      perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
      23      if not perf_trampoline:
      24          return False
      25      return int(perf_trampoline) == 1
      26  
      27  
      28  if not supports_trampoline_profiling():
      29      raise unittest.SkipTest("perf trampoline profiling not supported")
      30  
      31  
      32  class ESC[4;38;5;81mTestPerfTrampoline(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      33      def setUp(self):
      34          super().setUp()
      35          self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
      36  
      37      def tearDown(self) -> None:
      38          super().tearDown()
      39          files_to_delete = (
      40              set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
      41          )
      42          for file in files_to_delete:
      43              file.unlink()
      44  
      45      def test_trampoline_works(self):
      46          code = """if 1:
      47                  def foo():
      48                      pass
      49  
      50                  def bar():
      51                      foo()
      52  
      53                  def baz():
      54                      bar()
      55  
      56                  baz()
      57                  """
      58          with temp_dir() as script_dir:
      59              script = make_script(script_dir, "perftest", code)
      60              with subprocess.Popen(
      61                  [sys.executable, "-Xperf", script],
      62                  text=True,
      63                  stderr=subprocess.PIPE,
      64                  stdout=subprocess.PIPE,
      65              ) as process:
      66                  stdout, stderr = process.communicate()
      67  
      68          self.assertEqual(stderr, "")
      69          self.assertEqual(stdout, "")
      70  
      71          perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
      72          self.assertTrue(perf_file.exists())
      73          perf_file_contents = perf_file.read_text()
      74          perf_lines = perf_file_contents.splitlines();
      75          expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"]
      76          for expected_symbol in expected_symbols:
      77              perf_line = next((line for line in perf_lines if expected_symbol in line), None)
      78              self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file")
      79              perf_addr = perf_line.split(" ")[0]
      80              self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x")
      81              self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters")
      82  
      83      def test_trampoline_works_with_forks(self):
      84          code = """if 1:
      85                  import os, sys
      86  
      87                  def foo_fork():
      88                      pass
      89  
      90                  def bar_fork():
      91                      foo_fork()
      92  
      93                  def baz_fork():
      94                      bar_fork()
      95  
      96                  def foo():
      97                      pid = os.fork()
      98                      if pid == 0:
      99                          print(os.getpid())
     100                          baz_fork()
     101                      else:
     102                          _, status = os.waitpid(-1, 0)
     103                          sys.exit(status)
     104  
     105                  def bar():
     106                      foo()
     107  
     108                  def baz():
     109                      bar()
     110  
     111                  baz()
     112                  """
     113          with temp_dir() as script_dir:
     114              script = make_script(script_dir, "perftest", code)
     115              with subprocess.Popen(
     116                  [sys.executable, "-Xperf", script],
     117                  text=True,
     118                  stderr=subprocess.PIPE,
     119                  stdout=subprocess.PIPE,
     120              ) as process:
     121                  stdout, stderr = process.communicate()
     122  
     123          self.assertEqual(process.returncode, 0)
     124          self.assertEqual(stderr, "")
     125          child_pid = int(stdout.strip())
     126          perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
     127          perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
     128          self.assertTrue(perf_file.exists())
     129          self.assertTrue(perf_child_file.exists())
     130  
     131          perf_file_contents = perf_file.read_text()
     132          self.assertIn(f"py::foo:{script}", perf_file_contents)
     133          self.assertIn(f"py::bar:{script}", perf_file_contents)
     134          self.assertIn(f"py::baz:{script}", perf_file_contents)
     135  
     136          child_perf_file_contents = perf_child_file.read_text()
     137          self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
     138          self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
     139          self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
     140  
     141      def test_sys_api(self):
     142          code = """if 1:
     143                  import sys
     144                  def foo():
     145                      pass
     146  
     147                  def spam():
     148                      pass
     149  
     150                  def bar():
     151                      sys.deactivate_stack_trampoline()
     152                      foo()
     153                      sys.activate_stack_trampoline("perf")
     154                      spam()
     155  
     156                  def baz():
     157                      bar()
     158  
     159                  sys.activate_stack_trampoline("perf")
     160                  baz()
     161                  """
     162          with temp_dir() as script_dir:
     163              script = make_script(script_dir, "perftest", code)
     164              with subprocess.Popen(
     165                  [sys.executable, script],
     166                  text=True,
     167                  stderr=subprocess.PIPE,
     168                  stdout=subprocess.PIPE,
     169              ) as process:
     170                  stdout, stderr = process.communicate()
     171  
     172          self.assertEqual(stderr, "")
     173          self.assertEqual(stdout, "")
     174  
     175          perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
     176          self.assertTrue(perf_file.exists())
     177          perf_file_contents = perf_file.read_text()
     178          self.assertNotIn(f"py::foo:{script}", perf_file_contents)
     179          self.assertIn(f"py::spam:{script}", perf_file_contents)
     180          self.assertIn(f"py::bar:{script}", perf_file_contents)
     181          self.assertIn(f"py::baz:{script}", perf_file_contents)
     182  
     183      def test_sys_api_with_existing_trampoline(self):
     184          code = """if 1:
     185                  import sys
     186                  sys.activate_stack_trampoline("perf")
     187                  sys.activate_stack_trampoline("perf")
     188                  """
     189          assert_python_ok("-c", code)
     190  
     191      def test_sys_api_with_invalid_trampoline(self):
     192          code = """if 1:
     193                  import sys
     194                  sys.activate_stack_trampoline("invalid")
     195                  """
     196          rc, out, err = assert_python_failure("-c", code)
     197          self.assertIn("invalid backend: invalid", err.decode())
     198  
     199      def test_sys_api_get_status(self):
     200          code = """if 1:
     201                  import sys
     202                  sys.activate_stack_trampoline("perf")
     203                  assert sys.is_stack_trampoline_active() is True
     204                  sys.deactivate_stack_trampoline()
     205                  assert sys.is_stack_trampoline_active() is False
     206                  """
     207          assert_python_ok("-c", code)
     208  
     209  
     210  def is_unwinding_reliable():
     211      cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
     212      if not cflags:
     213          return False
     214      return "no-omit-frame-pointer" in cflags
     215  
     216  
     217  def perf_command_works():
     218      try:
     219          cmd = ["perf", "--help"]
     220          stdout = subprocess.check_output(cmd, text=True)
     221      except (subprocess.SubprocessError, OSError):
     222          return False
     223  
     224      # perf version does not return a version number on Fedora. Use presence
     225      # of "perf.data" in help as indicator that it's perf from Linux tools.
     226      if "perf.data" not in stdout:
     227          return False
     228  
     229      # Check that we can run a simple perf run
     230      with temp_dir() as script_dir:
     231          try:
     232              output_file = script_dir + "/perf_output.perf"
     233              cmd = (
     234                  "perf",
     235                  "record",
     236                  "-g",
     237                  "--call-graph=fp",
     238                  "-o",
     239                  output_file,
     240                  "--",
     241                  sys.executable,
     242                  "-c",
     243                  'print("hello")',
     244              )
     245              stdout = subprocess.check_output(
     246                  cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
     247              )
     248          except (subprocess.SubprocessError, OSError):
     249              return False
     250  
     251          if "hello" not in stdout:
     252              return False
     253  
     254      return True
     255  
     256  
     257  def run_perf(cwd, *args, **env_vars):
     258      if env_vars:
     259          env = os.environ.copy()
     260          env.update(env_vars)
     261      else:
     262          env = None
     263      output_file = cwd + "/perf_output.perf"
     264      base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
     265      proc = subprocess.run(
     266          base_cmd + args,
     267          stdout=subprocess.PIPE,
     268          stderr=subprocess.PIPE,
     269          env=env,
     270      )
     271      if proc.returncode:
     272          print(proc.stderr)
     273          raise ValueError(f"Perf failed with return code {proc.returncode}")
     274  
     275      base_cmd = ("perf", "script")
     276      proc = subprocess.run(
     277          ("perf", "script", "-i", output_file),
     278          stdout=subprocess.PIPE,
     279          stderr=subprocess.PIPE,
     280          env=env,
     281          check=True,
     282      )
     283      return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
     284          "utf-8", "replace"
     285      )
     286  
     287  
     288  @unittest.skipUnless(perf_command_works(), "perf command doesn't work")
     289  @unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable")
     290  @support.skip_if_sanitizer(address=True, memory=True, ub=True)
     291  class ESC[4;38;5;81mTestPerfProfiler(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     292      def setUp(self):
     293          super().setUp()
     294          self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
     295  
     296      def tearDown(self) -> None:
     297          super().tearDown()
     298          files_to_delete = (
     299              set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
     300          )
     301          for file in files_to_delete:
     302              file.unlink()
     303  
     304      def test_python_calls_appear_in_the_stack_if_perf_activated(self):
     305          with temp_dir() as script_dir:
     306              code = """if 1:
     307                  def foo(n):
     308                      x = 0
     309                      for i in range(n):
     310                          x += i
     311  
     312                  def bar(n):
     313                      foo(n)
     314  
     315                  def baz(n):
     316                      bar(n)
     317  
     318                  baz(10000000)
     319                  """
     320              script = make_script(script_dir, "perftest", code)
     321              stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script)
     322              self.assertEqual(stderr, "")
     323  
     324              self.assertIn(f"py::foo:{script}", stdout)
     325              self.assertIn(f"py::bar:{script}", stdout)
     326              self.assertIn(f"py::baz:{script}", stdout)
     327  
     328      def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self):
     329          with temp_dir() as script_dir:
     330              code = """if 1:
     331                  def foo(n):
     332                      x = 0
     333                      for i in range(n):
     334                          x += i
     335  
     336                  def bar(n):
     337                      foo(n)
     338  
     339                  def baz(n):
     340                      bar(n)
     341  
     342                  baz(10000000)
     343                  """
     344              script = make_script(script_dir, "perftest", code)
     345              stdout, stderr = run_perf(script_dir, sys.executable, script)
     346              self.assertEqual(stderr, "")
     347  
     348              self.assertNotIn(f"py::foo:{script}", stdout)
     349              self.assertNotIn(f"py::bar:{script}", stdout)
     350              self.assertNotIn(f"py::baz:{script}", stdout)
     351  
     352  
     353  if __name__ == "__main__":
     354      unittest.main()