1import _winapi 2import math 3import msvcrt 4import os 5import subprocess 6import uuid 7import winreg 8from test import support 9from test.libregrtest.utils import print_warning 10 11 12# Max size of asynchronous reads 13BUFSIZE = 8192 14# Seconds per measurement 15SAMPLING_INTERVAL = 1 16# Exponential damping factor to compute exponentially weighted moving average 17# on 1 minute (60 seconds) 18LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) 19# Initialize the load using the arithmetic mean of the first NVALUE values 20# of the Processor Queue Length 21NVALUE = 5 22# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names 23# of typeperf are registered 24COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" 25 r"\Perflib\CurrentLanguage") 26 27 28class WindowsLoadTracker(): 29 """ 30 This class asynchronously interacts with the `typeperf` command to read 31 the system load on Windows. Multiprocessing and threads can't be used 32 here because they interfere with the test suite's cases for those 33 modules. 34 """ 35 36 def __init__(self): 37 self._values = [] 38 self._load = None 39 self._buffer = '' 40 self._popen = None 41 self.start() 42 43 def start(self): 44 # Create a named pipe which allows for asynchronous IO in Windows 45 pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) 46 47 open_mode = _winapi.PIPE_ACCESS_INBOUND 48 open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 49 open_mode |= _winapi.FILE_FLAG_OVERLAPPED 50 51 # This is the read end of the pipe, where we will be grabbing output 52 self.pipe = _winapi.CreateNamedPipe( 53 pipe_name, open_mode, _winapi.PIPE_WAIT, 54 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 55 ) 56 # The write end of the pipe which is passed to the created process 57 pipe_write_end = _winapi.CreateFile( 58 pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, 59 _winapi.OPEN_EXISTING, 0, _winapi.NULL 60 ) 61 # Open up the handle as a python file object so we can pass it to 62 # subprocess 63 command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) 64 65 # Connect to the read end of the pipe in overlap/async mode 66 overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) 67 overlap.GetOverlappedResult(True) 68 69 # Spawn off the load monitor 70 counter_name = self._get_counter_name() 71 command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] 72 self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) 73 74 # Close our copy of the write end of the pipe 75 os.close(command_stdout) 76 77 def _get_counter_name(self): 78 # accessing the registry to get the counter localization name 79 with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: 80 counters = winreg.QueryValueEx(perfkey, 'Counter')[0] 81 82 # Convert [key1, value1, key2, value2, ...] list 83 # to {key1: value1, key2: value2, ...} dict 84 counters = iter(counters) 85 counters_dict = dict(zip(counters, counters)) 86 87 # System counter has key '2' and Processor Queue Length has key '44' 88 system = counters_dict['2'] 89 process_queue_length = counters_dict['44'] 90 return f'"\\{system}\\{process_queue_length}"' 91 92 def close(self, kill=True): 93 if self._popen is None: 94 return 95 96 self._load = None 97 98 if kill: 99 self._popen.kill() 100 self._popen.wait() 101 self._popen = None 102 103 def __del__(self): 104 self.close() 105 106 def _parse_line(self, line): 107 # typeperf outputs in a CSV format like this: 108 # "07/19/2018 01:32:26.605","3.000000" 109 # (date, process queue length) 110 tokens = line.split(',') 111 if len(tokens) != 2: 112 raise ValueError 113 114 value = tokens[1] 115 if not value.startswith('"') or not value.endswith('"'): 116 raise ValueError 117 value = value[1:-1] 118 return float(value) 119 120 def _read_lines(self): 121 overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) 122 bytes_read, res = overlapped.GetOverlappedResult(False) 123 if res != 0: 124 return () 125 126 output = overlapped.getbuffer() 127 output = output.decode('oem', 'replace') 128 output = self._buffer + output 129 lines = output.splitlines(True) 130 131 # bpo-36670: typeperf only writes a newline *before* writing a value, 132 # not after. Sometimes, the written line in incomplete (ex: only 133 # timestamp, without the process queue length). Only pass the last line 134 # to the parser if it's a valid value, otherwise store it in 135 # self._buffer. 136 try: 137 self._parse_line(lines[-1]) 138 except ValueError: 139 self._buffer = lines.pop(-1) 140 else: 141 self._buffer = '' 142 143 return lines 144 145 def getloadavg(self): 146 if self._popen is None: 147 return None 148 149 returncode = self._popen.poll() 150 if returncode is not None: 151 self.close(kill=False) 152 return None 153 154 try: 155 lines = self._read_lines() 156 except BrokenPipeError: 157 self.close() 158 return None 159 160 for line in lines: 161 line = line.rstrip() 162 163 # Ignore the initial header: 164 # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" 165 if 'PDH-CSV' in line: 166 continue 167 168 # Ignore blank lines 169 if not line: 170 continue 171 172 try: 173 processor_queue_length = self._parse_line(line) 174 except ValueError: 175 print_warning("Failed to parse typeperf output: %a" % line) 176 continue 177 178 # We use an exponentially weighted moving average, imitating the 179 # load calculation on Unix systems. 180 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation 181 # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average 182 if self._load is not None: 183 self._load = (self._load * LOAD_FACTOR_1 184 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) 185 elif len(self._values) < NVALUE: 186 self._values.append(processor_queue_length) 187 else: 188 self._load = sum(self._values) / len(self._values) 189 190 return self._load 191