1 import _overlapped
2 import _thread
3 import _winapi
4 import math
5 import struct
6 import winreg
7
8
9 # Seconds per measurement
10 SAMPLING_INTERVAL = 1
11 # Exponential damping factor to compute exponentially weighted moving average
12 # on 1 minute (60 seconds)
13 LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
14 # Initialize the load using the arithmetic mean of the first NVALUE values
15 # of the Processor Queue Length
16 NVALUE = 5
17
18
19 class ESC[4;38;5;81mWindowsLoadTracker():
20 """
21 This class asynchronously reads the performance counters to calculate
22 the system load on Windows. A "raw" thread is used here to prevent
23 interference with the test suite's cases for the threading module.
24 """
25
26 def __init__(self):
27 # Pre-flight test for access to the performance data;
28 # `PermissionError` will be raised if not allowed
29 winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
30
31 self._values = []
32 self._load = None
33 self._running = _overlapped.CreateEvent(None, True, False, None)
34 self._stopped = _overlapped.CreateEvent(None, True, False, None)
35
36 _thread.start_new_thread(self._update_load, (), {})
37
38 def _update_load(self,
39 # localize module access to prevent shutdown errors
40 _wait=_winapi.WaitForSingleObject,
41 _signal=_overlapped.SetEvent):
42 # run until signaled to stop
43 while _wait(self._running, 1000):
44 self._calculate_load()
45 # notify stopped
46 _signal(self._stopped)
47
48 def _calculate_load(self,
49 # localize module access to prevent shutdown errors
50 _query=winreg.QueryValueEx,
51 _hkey=winreg.HKEY_PERFORMANCE_DATA,
52 _unpack=struct.unpack_from):
53 # get the 'System' object
54 data, _ = _query(_hkey, '2')
55 # PERF_DATA_BLOCK {
56 # WCHAR Signature[4] 8 +
57 # DWOWD LittleEndian 4 +
58 # DWORD Version 4 +
59 # DWORD Revision 4 +
60 # DWORD TotalByteLength 4 +
61 # DWORD HeaderLength = 24 byte offset
62 # ...
63 # }
64 obj_start, = _unpack('L', data, 24)
65 # PERF_OBJECT_TYPE {
66 # DWORD TotalByteLength
67 # DWORD DefinitionLength
68 # DWORD HeaderLength
69 # ...
70 # }
71 data_start, defn_start = _unpack('4xLL', data, obj_start)
72 data_base = obj_start + data_start
73 defn_base = obj_start + defn_start
74 # find the 'Processor Queue Length' counter (index=44)
75 while defn_base < data_base:
76 # PERF_COUNTER_DEFINITION {
77 # DWORD ByteLength
78 # DWORD CounterNameTitleIndex
79 # ... [7 DWORDs/28 bytes]
80 # DWORD CounterOffset
81 # }
82 size, idx, offset = _unpack('LL28xL', data, defn_base)
83 defn_base += size
84 if idx == 44:
85 counter_offset = data_base + offset
86 # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
87 processor_queue_length, = _unpack('L', data, counter_offset)
88 break
89 else:
90 return
91
92 # We use an exponentially weighted moving average, imitating the
93 # load calculation on Unix systems.
94 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
95 # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
96 if self._load is not None:
97 self._load = (self._load * LOAD_FACTOR_1
98 + processor_queue_length * (1.0 - LOAD_FACTOR_1))
99 elif len(self._values) < NVALUE:
100 self._values.append(processor_queue_length)
101 else:
102 self._load = sum(self._values) / len(self._values)
103
104 def close(self, kill=True):
105 self.__del__()
106 return
107
108 def __del__(self,
109 # localize module access to prevent shutdown errors
110 _wait=_winapi.WaitForSingleObject,
111 _close=_winapi.CloseHandle,
112 _signal=_overlapped.SetEvent):
113 if self._running is not None:
114 # tell the update thread to quit
115 _signal(self._running)
116 # wait for the update thread to signal done
117 _wait(self._stopped, -1)
118 # cleanup events
119 _close(self._running)
120 _close(self._stopped)
121 self._running = self._stopped = None
122
123 def getloadavg(self):
124 return self._load