1 # Test the internal _wmi module on Windows
2 # This is used by the platform module, and potentially others
3
4 import unittest
5 from test.support import import_helper, requires_resource
6
7
8 # Do this first so test will be skipped if module doesn't exist
9 _wmi = import_helper.import_module('_wmi', required_on=['win'])
10
11
12 class ESC[4;38;5;81mWmiTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
13 def test_wmi_query_os_version(self):
14 r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
15 self.assertEqual(1, len(r))
16 k, eq, v = r[0].partition("=")
17 self.assertEqual("=", eq, r[0])
18 self.assertEqual("Version", k, r[0])
19 # Best we can check for the version is that it's digits, dot, digits, anything
20 # Otherwise, we are likely checking the result of the query against itself
21 self.assertRegex(v, r"\d+\.\d+.+$", r[0])
22
23 def test_wmi_query_repeated(self):
24 # Repeated queries should not break
25 for _ in range(10):
26 self.test_wmi_query_os_version()
27
28 def test_wmi_query_error(self):
29 # Invalid queries fail with OSError
30 try:
31 _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
32 except OSError as ex:
33 if ex.winerror & 0xFFFFFFFF == 0x80041010:
34 # This is the expected error code. All others should fail the test
35 return
36 self.fail("Expected OSError")
37
38 def test_wmi_query_repeated_error(self):
39 for _ in range(10):
40 self.test_wmi_query_error()
41
42 def test_wmi_query_not_select(self):
43 # Queries other than SELECT are blocked to avoid potential exploits
44 with self.assertRaises(ValueError):
45 _wmi.exec_query("not select, just in case someone tries something")
46
47 @requires_resource('cpu')
48 def test_wmi_query_overflow(self):
49 # Ensure very big queries fail
50 # Test multiple times to ensure consistency
51 for _ in range(2):
52 with self.assertRaises(OSError):
53 _wmi.exec_query("SELECT * FROM CIM_DataFile")
54
55 def test_wmi_query_multiple_rows(self):
56 # Multiple instances should have an extra null separator
57 r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
58 self.assertFalse(r.startswith("\0"), r)
59 self.assertFalse(r.endswith("\0"), r)
60 it = iter(r.split("\0"))
61 try:
62 while True:
63 self.assertRegex(next(it), r"ProcessId=\d+")
64 self.assertEqual("", next(it))
65 except StopIteration:
66 pass
67
68 def test_wmi_query_threads(self):
69 from concurrent.futures import ThreadPoolExecutor
70 query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
71 with ThreadPoolExecutor(4) as pool:
72 task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
73 for t in task:
74 self.assertRegex(t.result(), "ProcessId=")