1import _overlapped 2import _thread 3import _winapi 4import math 5import struct 6import winreg 7 8 9# Seconds per measurement 10SAMPLING_INTERVAL = 1 11# Exponential damping factor to compute exponentially weighted moving average 12# on 1 minute (60 seconds) 13LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) 14# Initialize the load using the arithmetic mean of the first NVALUE values 15# of the Processor Queue Length 16NVALUE = 5 17 18 19class WindowsLoadTracker(): 20 """ 21 This class asynchronously reads the performance counters to calculate 22 the system load on Windows. A "raw" thread is used here to prevent 23 interference with the test suite's cases for the threading module. 24 """ 25 26 def __init__(self): 27 # make __del__ not fail if pre-flight test fails 28 self._running = None 29 self._stopped = None 30 31 # Pre-flight test for access to the performance data; 32 # `PermissionError` will be raised if not allowed 33 winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) 34 35 self._values = [] 36 self._load = None 37 self._running = _overlapped.CreateEvent(None, True, False, None) 38 self._stopped = _overlapped.CreateEvent(None, True, False, None) 39 40 _thread.start_new_thread(self._update_load, (), {}) 41 42 def _update_load(self, 43 # localize module access to prevent shutdown errors 44 _wait=_winapi.WaitForSingleObject, 45 _signal=_overlapped.SetEvent): 46 # run until signaled to stop 47 while _wait(self._running, 1000): 48 self._calculate_load() 49 # notify stopped 50 _signal(self._stopped) 51 52 def _calculate_load(self, 53 # localize module access to prevent shutdown errors 54 _query=winreg.QueryValueEx, 55 _hkey=winreg.HKEY_PERFORMANCE_DATA, 56 _unpack=struct.unpack_from): 57 # get the 'System' object 58 data, _ = _query(_hkey, '2') 59 # PERF_DATA_BLOCK { 60 # WCHAR Signature[4] 8 + 61 # DWOWD LittleEndian 4 + 62 # DWORD Version 4 + 63 # DWORD Revision 4 + 64 # DWORD TotalByteLength 4 + 65 # DWORD HeaderLength = 24 byte offset 66 # ... 67 # } 68 obj_start, = _unpack('L', data, 24) 69 # PERF_OBJECT_TYPE { 70 # DWORD TotalByteLength 71 # DWORD DefinitionLength 72 # DWORD HeaderLength 73 # ... 74 # } 75 data_start, defn_start = _unpack('4xLL', data, obj_start) 76 data_base = obj_start + data_start 77 defn_base = obj_start + defn_start 78 # find the 'Processor Queue Length' counter (index=44) 79 while defn_base < data_base: 80 # PERF_COUNTER_DEFINITION { 81 # DWORD ByteLength 82 # DWORD CounterNameTitleIndex 83 # ... [7 DWORDs/28 bytes] 84 # DWORD CounterOffset 85 # } 86 size, idx, offset = _unpack('LL28xL', data, defn_base) 87 defn_base += size 88 if idx == 44: 89 counter_offset = data_base + offset 90 # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) 91 processor_queue_length, = _unpack('L', data, counter_offset) 92 break 93 else: 94 return 95 96 # We use an exponentially weighted moving average, imitating the 97 # load calculation on Unix systems. 98 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation 99 # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average 100 if self._load is not None: 101 self._load = (self._load * LOAD_FACTOR_1 102 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) 103 elif len(self._values) < NVALUE: 104 self._values.append(processor_queue_length) 105 else: 106 self._load = sum(self._values) / len(self._values) 107 108 def close(self, kill=True): 109 self.__del__() 110 return 111 112 def __del__(self, 113 # localize module access to prevent shutdown errors 114 _wait=_winapi.WaitForSingleObject, 115 _close=_winapi.CloseHandle, 116 _signal=_overlapped.SetEvent): 117 if self._running is not None: 118 # tell the update thread to quit 119 _signal(self._running) 120 # wait for the update thread to signal done 121 _wait(self._stopped, -1) 122 # cleanup events 123 _close(self._running) 124 _close(self._stopped) 125 self._running = self._stopped = None 126 127 def getloadavg(self): 128 return self._load 129