(root)/
Python-3.11.7/
Lib/
test/
test_dtrace.py
       1  import dis
       2  import os.path
       3  import re
       4  import subprocess
       5  import sys
       6  import sysconfig
       7  import types
       8  import unittest
       9  
      10  from test import support
      11  from test.support import findfile
      12  
      13  
      14  if not support.has_subprocess_support:
      15      raise unittest.SkipTest("test module requires subprocess")
      16  
      17  
      18  def abspath(filename):
      19      return os.path.abspath(findfile(filename, subdir="dtracedata"))
      20  
      21  
      22  def normalize_trace_output(output):
      23      """Normalize DTrace output for comparison.
      24  
      25      DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
      26      are concatenated. So if the operating system moves our thread around, the
      27      straight result can be "non-causal". So we add timestamps to the probe
      28      firing, sort by that field, then strip it from the output"""
      29  
      30      # When compiling with '--with-pydebug', strip '[# refs]' debug output.
      31      output = re.sub(r"\[[0-9]+ refs\]", "", output)
      32      try:
      33          result = [
      34              row.split("\t")
      35              for row in output.splitlines()
      36              if row and not row.startswith('#')
      37          ]
      38          result.sort(key=lambda row: int(row[0]))
      39          result = [row[1] for row in result]
      40          return "\n".join(result)
      41      except (IndexError, ValueError):
      42          raise AssertionError(
      43              "tracer produced unparsable output:\n{}".format(output)
      44          )
      45  
      46  
      47  class ESC[4;38;5;81mTraceBackend:
      48      EXTENSION = None
      49      COMMAND = None
      50      COMMAND_ARGS = []
      51  
      52      def run_case(self, name, optimize_python=None):
      53          actual_output = normalize_trace_output(self.trace_python(
      54              script_file=abspath(name + self.EXTENSION),
      55              python_file=abspath(name + ".py"),
      56              optimize_python=optimize_python))
      57  
      58          with open(abspath(name + self.EXTENSION + ".expected")) as f:
      59              expected_output = f.read().rstrip()
      60  
      61          return (expected_output, actual_output)
      62  
      63      def generate_trace_command(self, script_file, subcommand=None):
      64          command = self.COMMAND + [script_file]
      65          if subcommand:
      66              command += ["-c", subcommand]
      67          return command
      68  
      69      def trace(self, script_file, subcommand=None):
      70          command = self.generate_trace_command(script_file, subcommand)
      71          stdout, _ = subprocess.Popen(command,
      72                                       stdout=subprocess.PIPE,
      73                                       stderr=subprocess.STDOUT,
      74                                       universal_newlines=True).communicate()
      75          return stdout
      76  
      77      def trace_python(self, script_file, python_file, optimize_python=None):
      78          python_flags = []
      79          if optimize_python:
      80              python_flags.extend(["-O"] * optimize_python)
      81          subcommand = " ".join([sys.executable] + python_flags + [python_file])
      82          return self.trace(script_file, subcommand)
      83  
      84      def assert_usable(self):
      85          try:
      86              output = self.trace(abspath("assert_usable" + self.EXTENSION))
      87              output = output.strip()
      88          except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
      89              output = str(fnfe)
      90          if output != "probe: success":
      91              raise unittest.SkipTest(
      92                  "{}(1) failed: {}".format(self.COMMAND[0], output)
      93              )
      94  
      95  
      96  class ESC[4;38;5;81mDTraceBackend(ESC[4;38;5;149mTraceBackend):
      97      EXTENSION = ".d"
      98      COMMAND = ["dtrace", "-q", "-s"]
      99  
     100  
     101  class ESC[4;38;5;81mSystemTapBackend(ESC[4;38;5;149mTraceBackend):
     102      EXTENSION = ".stp"
     103      COMMAND = ["stap", "-g"]
     104  
     105  
     106  class ESC[4;38;5;81mTraceTests:
     107      # unittest.TestCase options
     108      maxDiff = None
     109  
     110      # TraceTests options
     111      backend = None
     112      optimize_python = 0
     113  
     114      @classmethod
     115      def setUpClass(self):
     116          self.backend.assert_usable()
     117  
     118      def run_case(self, name):
     119          actual_output, expected_output = self.backend.run_case(
     120              name, optimize_python=self.optimize_python)
     121          self.assertEqual(actual_output, expected_output)
     122  
     123      def test_function_entry_return(self):
     124          self.run_case("call_stack")
     125  
     126      def test_verify_call_opcodes(self):
     127          """Ensure our call stack test hits all function call opcodes"""
     128  
     129          opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
     130  
     131          with open(abspath("call_stack.py")) as f:
     132              code_string = f.read()
     133  
     134          def get_function_instructions(funcname):
     135              # Recompile with appropriate optimization setting
     136              code = compile(source=code_string,
     137                             filename="<string>",
     138                             mode="exec",
     139                             optimize=self.optimize_python)
     140  
     141              for c in code.co_consts:
     142                  if isinstance(c, types.CodeType) and c.co_name == funcname:
     143                      return dis.get_instructions(c)
     144              return []
     145  
     146          for instruction in get_function_instructions('start'):
     147              opcodes.discard(instruction.opname)
     148  
     149          self.assertEqual(set(), opcodes)
     150  
     151      def test_gc(self):
     152          self.run_case("gc")
     153  
     154      def test_line(self):
     155          self.run_case("line")
     156  
     157  
     158  class ESC[4;38;5;81mDTraceNormalTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     159      backend = DTraceBackend()
     160      optimize_python = 0
     161  
     162  
     163  class ESC[4;38;5;81mDTraceOptimizedTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     164      backend = DTraceBackend()
     165      optimize_python = 2
     166  
     167  
     168  class ESC[4;38;5;81mSystemTapNormalTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     169      backend = SystemTapBackend()
     170      optimize_python = 0
     171  
     172  
     173  class ESC[4;38;5;81mSystemTapOptimizedTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     174      backend = SystemTapBackend()
     175      optimize_python = 2
     176  
     177  class ESC[4;38;5;81mCheckDtraceProbes(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     178      @classmethod
     179      def setUpClass(cls):
     180          if sysconfig.get_config_var('WITH_DTRACE'):
     181              readelf_major_version, readelf_minor_version = cls.get_readelf_version()
     182              if support.verbose:
     183                  print(f"readelf version: {readelf_major_version}.{readelf_minor_version}")
     184          else:
     185              raise unittest.SkipTest("CPython must be configured with the --with-dtrace option.")
     186  
     187  
     188      @staticmethod
     189      def get_readelf_version():
     190          try:
     191              cmd = ["readelf", "--version"]
     192              proc = subprocess.Popen(
     193                  cmd,
     194                  stdout=subprocess.PIPE,
     195                  stderr=subprocess.PIPE,
     196                  universal_newlines=True,
     197              )
     198              with proc:
     199                  version, stderr = proc.communicate()
     200  
     201              if proc.returncode:
     202                  raise Exception(
     203                      f"Command {' '.join(cmd)!r} failed "
     204                      f"with exit code {proc.returncode}: "
     205                      f"stdout={version!r} stderr={stderr!r}"
     206                  )
     207          except OSError:
     208              raise unittest.SkipTest("Couldn't find readelf on the path")
     209  
     210          # Regex to parse:
     211          # 'GNU readelf (GNU Binutils) 2.40.0\n' -> 2.40
     212          match = re.search(r"^(?:GNU) readelf.*?\b(\d+)\.(\d+)", version)
     213          if match is None:
     214              raise unittest.SkipTest(f"Unable to parse readelf version: {version}")
     215  
     216          return int(match.group(1)), int(match.group(2))
     217  
     218      def get_readelf_output(self):
     219          command = ["readelf", "-n", sys.executable]
     220          stdout, _ = subprocess.Popen(
     221              command,
     222              stdout=subprocess.PIPE,
     223              stderr=subprocess.STDOUT,
     224              universal_newlines=True,
     225          ).communicate()
     226          return stdout
     227  
     228      def test_check_probes(self):
     229          readelf_output = self.get_readelf_output()
     230  
     231          available_probe_names = [
     232              "Name: import__find__load__done",
     233              "Name: import__find__load__start",
     234              "Name: audit",
     235              "Name: gc__start",
     236              "Name: gc__done",
     237              "Name: function__entry",
     238              "Name: function__return",
     239              "Name: line",
     240          ]
     241  
     242          for probe_name in available_probe_names:
     243              with self.subTest(probe_name=probe_name):
     244                  self.assertIn(probe_name, readelf_output)
     245  
     246  
     247  if __name__ == '__main__':
     248      unittest.main()