(root)/
Python-3.12.0/
Lib/
test/
test_wmi.py
       1  # Test the internal _wmi module on Windows
       2  # This is used by the platform module, and potentially others
       3  
       4  import unittest
       5  from test.support import import_helper, requires_resource
       6  
       7  
       8  # Do this first so test will be skipped if module doesn't exist
       9  _wmi = import_helper.import_module('_wmi', required_on=['win'])
      10  
      11  
      12  class ESC[4;38;5;81mWmiTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      13      def test_wmi_query_os_version(self):
      14          r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
      15          self.assertEqual(1, len(r))
      16          k, eq, v = r[0].partition("=")
      17          self.assertEqual("=", eq, r[0])
      18          self.assertEqual("Version", k, r[0])
      19          # Best we can check for the version is that it's digits, dot, digits, anything
      20          # Otherwise, we are likely checking the result of the query against itself
      21          self.assertRegex(v, r"\d+\.\d+.+$", r[0])
      22  
      23      def test_wmi_query_repeated(self):
      24          # Repeated queries should not break
      25          for _ in range(10):
      26              self.test_wmi_query_os_version()
      27  
      28      def test_wmi_query_error(self):
      29          # Invalid queries fail with OSError
      30          try:
      31              _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
      32          except OSError as ex:
      33              if ex.winerror & 0xFFFFFFFF == 0x80041010:
      34                  # This is the expected error code. All others should fail the test
      35                  return
      36          self.fail("Expected OSError")
      37  
      38      def test_wmi_query_repeated_error(self):
      39          for _ in range(10):
      40              self.test_wmi_query_error()
      41  
      42      def test_wmi_query_not_select(self):
      43          # Queries other than SELECT are blocked to avoid potential exploits
      44          with self.assertRaises(ValueError):
      45              _wmi.exec_query("not select, just in case someone tries something")
      46  
      47      @requires_resource('cpu')
      48      def test_wmi_query_overflow(self):
      49          # Ensure very big queries fail
      50          # Test multiple times to ensure consistency
      51          for _ in range(2):
      52              with self.assertRaises(OSError):
      53                  _wmi.exec_query("SELECT * FROM CIM_DataFile")
      54  
      55      def test_wmi_query_multiple_rows(self):
      56          # Multiple instances should have an extra null separator
      57          r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
      58          self.assertFalse(r.startswith("\0"), r)
      59          self.assertFalse(r.endswith("\0"), r)
      60          it = iter(r.split("\0"))
      61          try:
      62              while True:
      63                  self.assertRegex(next(it), r"ProcessId=\d+")
      64                  self.assertEqual("", next(it))
      65          except StopIteration:
      66              pass
      67  
      68      def test_wmi_query_threads(self):
      69          from concurrent.futures import ThreadPoolExecutor
      70          query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
      71          with ThreadPoolExecutor(4) as pool:
      72              task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
      73              for t in task:
      74                  self.assertRegex(t.result(), "ProcessId=")