1import _winapi 2import math 3import msvcrt 4import os 5import subprocess 6import uuid 7import winreg 8from test.support import os_helper 9from test.libregrtest.utils import print_warning 10 11 12# Max size of asynchronous reads 13BUFSIZE = 8192 14# Seconds per measurement 15SAMPLING_INTERVAL = 1 16# Exponential damping factor to compute exponentially weighted moving average 17# on 1 minute (60 seconds) 18LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) 19# Initialize the load using the arithmetic mean of the first NVALUE values 20# of the Processor Queue Length 21NVALUE = 5 22# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names 23# of typeperf are registered 24COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" 25 r"\Perflib\CurrentLanguage") 26 27 28class WindowsLoadTracker(): 29 """ 30 This class asynchronously interacts with the `typeperf` command to read 31 the system load on Windows. Multiprocessing and threads can't be used 32 here because they interfere with the test suite's cases for those 33 modules. 34 """ 35 36 def __init__(self): 37 self._values = [] 38 self._load = None 39 self._buffer = '' 40 self._popen = None 41 self.start() 42 43 def start(self): 44 # Create a named pipe which allows for asynchronous IO in Windows 45 pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) 46 47 open_mode = _winapi.PIPE_ACCESS_INBOUND 48 open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 49 open_mode |= _winapi.FILE_FLAG_OVERLAPPED 50 51 # This is the read end of the pipe, where we will be grabbing output 52 self.pipe = _winapi.CreateNamedPipe( 53 pipe_name, open_mode, _winapi.PIPE_WAIT, 54 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 55 ) 56 # The write end of the pipe which is passed to the created process 57 pipe_write_end = _winapi.CreateFile( 58 pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, 59 _winapi.OPEN_EXISTING, 0, _winapi.NULL 60 ) 61 # Open up the handle as a python file object so we can pass it to 62 # subprocess 63 command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) 64 65 # Connect to the read end of the pipe in overlap/async mode 66 overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) 67 overlap.GetOverlappedResult(True) 68 69 # Spawn off the load monitor 70 counter_name = self._get_counter_name() 71 command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] 72 self._popen = subprocess.Popen(' '.join(command), 73 stdout=command_stdout, 74 cwd=os_helper.SAVEDCWD) 75 76 # Close our copy of the write end of the pipe 77 os.close(command_stdout) 78 79 def _get_counter_name(self): 80 # accessing the registry to get the counter localization name 81 with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: 82 counters = winreg.QueryValueEx(perfkey, 'Counter')[0] 83 84 # Convert [key1, value1, key2, value2, ...] list 85 # to {key1: value1, key2: value2, ...} dict 86 counters = iter(counters) 87 counters_dict = dict(zip(counters, counters)) 88 89 # System counter has key '2' and Processor Queue Length has key '44' 90 system = counters_dict['2'] 91 process_queue_length = counters_dict['44'] 92 return f'"\\{system}\\{process_queue_length}"' 93 94 def close(self, kill=True): 95 if self._popen is None: 96 return 97 98 self._load = None 99 100 if kill: 101 self._popen.kill() 102 self._popen.wait() 103 self._popen = None 104 105 def __del__(self): 106 self.close() 107 108 def _parse_line(self, line): 109 # typeperf outputs in a CSV format like this: 110 # "07/19/2018 01:32:26.605","3.000000" 111 # (date, process queue length) 112 tokens = line.split(',') 113 if len(tokens) != 2: 114 raise ValueError 115 116 value = tokens[1] 117 if not value.startswith('"') or not value.endswith('"'): 118 raise ValueError 119 value = value[1:-1] 120 return float(value) 121 122 def _read_lines(self): 123 overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) 124 bytes_read, res = overlapped.GetOverlappedResult(False) 125 if res != 0: 126 return () 127 128 output = overlapped.getbuffer() 129 output = output.decode('oem', 'replace') 130 output = self._buffer + output 131 lines = output.splitlines(True) 132 133 # bpo-36670: typeperf only writes a newline *before* writing a value, 134 # not after. Sometimes, the written line in incomplete (ex: only 135 # timestamp, without the process queue length). Only pass the last line 136 # to the parser if it's a valid value, otherwise store it in 137 # self._buffer. 138 try: 139 self._parse_line(lines[-1]) 140 except ValueError: 141 self._buffer = lines.pop(-1) 142 else: 143 self._buffer = '' 144 145 return lines 146 147 def getloadavg(self): 148 if self._popen is None: 149 return None 150 151 returncode = self._popen.poll() 152 if returncode is not None: 153 self.close(kill=False) 154 return None 155 156 try: 157 lines = self._read_lines() 158 except BrokenPipeError: 159 self.close() 160 return None 161 162 for line in lines: 163 line = line.rstrip() 164 165 # Ignore the initial header: 166 # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" 167 if 'PDH-CSV' in line: 168 continue 169 170 # Ignore blank lines 171 if not line: 172 continue 173 174 try: 175 processor_queue_length = self._parse_line(line) 176 except ValueError: 177 print_warning("Failed to parse typeperf output: %a" % line) 178 continue 179 180 # We use an exponentially weighted moving average, imitating the 181 # load calculation on Unix systems. 182 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation 183 # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average 184 if self._load is not None: 185 self._load = (self._load * LOAD_FACTOR_1 186 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) 187 elif len(self._values) < NVALUE: 188 self._values.append(processor_queue_length) 189 else: 190 self._load = sum(self._values) / len(self._values) 191 192 return self._load 193