(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
win_utils.py
       1  import _overlapped
       2  import _thread
       3  import _winapi
       4  import math
       5  import struct
       6  import winreg
       7  
       8  
       9  # Seconds per measurement
      10  SAMPLING_INTERVAL = 1
      11  # Exponential damping factor to compute exponentially weighted moving average
      12  # on 1 minute (60 seconds)
      13  LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
      14  # Initialize the load using the arithmetic mean of the first NVALUE values
      15  # of the Processor Queue Length
      16  NVALUE = 5
      17  
      18  
      19  class ESC[4;38;5;81mWindowsLoadTracker():
      20      """
      21      This class asynchronously reads the performance counters to calculate
      22      the system load on Windows.  A "raw" thread is used here to prevent
      23      interference with the test suite's cases for the threading module.
      24      """
      25  
      26      def __init__(self):
      27          # Pre-flight test for access to the performance data;
      28          # `PermissionError` will be raised if not allowed
      29          winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
      30  
      31          self._values = []
      32          self._load = None
      33          self._running = _overlapped.CreateEvent(None, True, False, None)
      34          self._stopped = _overlapped.CreateEvent(None, True, False, None)
      35  
      36          _thread.start_new_thread(self._update_load, (), {})
      37  
      38      def _update_load(self,
      39                      # localize module access to prevent shutdown errors
      40                       _wait=_winapi.WaitForSingleObject,
      41                       _signal=_overlapped.SetEvent):
      42          # run until signaled to stop
      43          while _wait(self._running, 1000):
      44              self._calculate_load()
      45          # notify stopped
      46          _signal(self._stopped)
      47  
      48      def _calculate_load(self,
      49                          # localize module access to prevent shutdown errors
      50                          _query=winreg.QueryValueEx,
      51                          _hkey=winreg.HKEY_PERFORMANCE_DATA,
      52                          _unpack=struct.unpack_from):
      53          # get the 'System' object
      54          data, _ = _query(_hkey, '2')
      55          # PERF_DATA_BLOCK {
      56          #   WCHAR Signature[4]      8 +
      57          #   DWOWD LittleEndian      4 +
      58          #   DWORD Version           4 +
      59          #   DWORD Revision          4 +
      60          #   DWORD TotalByteLength   4 +
      61          #   DWORD HeaderLength      = 24 byte offset
      62          #   ...
      63          # }
      64          obj_start, = _unpack('L', data, 24)
      65          # PERF_OBJECT_TYPE {
      66          #   DWORD TotalByteLength
      67          #   DWORD DefinitionLength
      68          #   DWORD HeaderLength
      69          #   ...
      70          # }
      71          data_start, defn_start = _unpack('4xLL', data, obj_start)
      72          data_base = obj_start + data_start
      73          defn_base = obj_start + defn_start
      74          # find the 'Processor Queue Length' counter (index=44)
      75          while defn_base < data_base:
      76              # PERF_COUNTER_DEFINITION {
      77              #   DWORD ByteLength
      78              #   DWORD CounterNameTitleIndex
      79              #   ... [7 DWORDs/28 bytes]
      80              #   DWORD CounterOffset
      81              # }
      82              size, idx, offset = _unpack('LL28xL', data, defn_base)
      83              defn_base += size
      84              if idx == 44:
      85                  counter_offset = data_base + offset
      86                  # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
      87                  processor_queue_length, = _unpack('L', data, counter_offset)
      88                  break
      89          else:
      90              return
      91  
      92          # We use an exponentially weighted moving average, imitating the
      93          # load calculation on Unix systems.
      94          # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
      95          # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
      96          if self._load is not None:
      97              self._load = (self._load * LOAD_FACTOR_1
      98                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
      99          elif len(self._values) < NVALUE:
     100              self._values.append(processor_queue_length)
     101          else:
     102              self._load = sum(self._values) / len(self._values)
     103  
     104      def close(self, kill=True):
     105          self.__del__()
     106          return
     107  
     108      def __del__(self,
     109                  # localize module access to prevent shutdown errors
     110                  _wait=_winapi.WaitForSingleObject,
     111                  _close=_winapi.CloseHandle,
     112                  _signal=_overlapped.SetEvent):
     113          if self._running is not None:
     114              # tell the update thread to quit
     115              _signal(self._running)
     116              # wait for the update thread to signal done
     117              _wait(self._stopped, -1)
     118              # cleanup events
     119              _close(self._running)
     120              _close(self._stopped)
     121              self._running = self._stopped = None
     122  
     123      def getloadavg(self):
     124          return self._load