import _winapi import math import msvcrt import os import subprocess import uuid import winreg from test import support from test.libregrtest.utils import print_warning # Max size of asynchronous reads BUFSIZE = 8192 # Seconds per measurement SAMPLING_INTERVAL = 1 # Exponential damping factor to compute exponentially weighted moving average # on 1 minute (60 seconds) LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) # Initialize the load using the arithmetic mean of the first NVALUE values # of the Processor Queue Length NVALUE = 5 # Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names # of typeperf are registered COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" r"\Perflib\CurrentLanguage") class WindowsLoadTracker(): """ This class asynchronously interacts with the `typeperf` command to read the system load on Windows. Multiprocessing and threads can't be used here because they interfere with the test suite's cases for those modules. """ def __init__(self): self._values = [] self._load = None self._buffer = '' self._popen = None self.start() def start(self): # Create a named pipe which allows for asynchronous IO in Windows pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) open_mode = _winapi.PIPE_ACCESS_INBOUND open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE open_mode |= _winapi.FILE_FLAG_OVERLAPPED # This is the read end of the pipe, where we will be grabbing output self.pipe = _winapi.CreateNamedPipe( pipe_name, open_mode, _winapi.PIPE_WAIT, 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) # The write end of the pipe which is passed to the created process pipe_write_end = _winapi.CreateFile( pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 0, _winapi.NULL ) # Open up the handle as a python file object so we can pass it to # subprocess command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) # Connect to the read end of the pipe in overlap/async mode overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) overlap.GetOverlappedResult(True) # Spawn off the load monitor counter_name = self._get_counter_name() command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) # Close our copy of the write end of the pipe os.close(command_stdout) def _get_counter_name(self): # accessing the registry to get the counter localization name with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: counters = winreg.QueryValueEx(perfkey, 'Counter')[0] # Convert [key1, value1, key2, value2, ...] list # to {key1: value1, key2: value2, ...} dict counters = iter(counters) counters_dict = dict(zip(counters, counters)) # System counter has key '2' and Processor Queue Length has key '44' system = counters_dict['2'] process_queue_length = counters_dict['44'] return f'"\\{system}\\{process_queue_length}"' def close(self, kill=True): if self._popen is None: return self._load = None if kill: self._popen.kill() self._popen.wait() self._popen = None def __del__(self): self.close() def _parse_line(self, line): # typeperf outputs in a CSV format like this: # "07/19/2018 01:32:26.605","3.000000" # (date, process queue length) tokens = line.split(',') if len(tokens) != 2: raise ValueError value = tokens[1] if not value.startswith('"') or not value.endswith('"'): raise ValueError value = value[1:-1] return float(value) def _read_lines(self): overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) bytes_read, res = overlapped.GetOverlappedResult(False) if res != 0: return () output = overlapped.getbuffer() output = output.decode('oem', 'replace') output = self._buffer + output lines = output.splitlines(True) # bpo-36670: typeperf only writes a newline *before* writing a value, # not after. Sometimes, the written line in incomplete (ex: only # timestamp, without the process queue length). Only pass the last line # to the parser if it's a valid value, otherwise store it in # self._buffer. try: self._parse_line(lines[-1]) except ValueError: self._buffer = lines.pop(-1) else: self._buffer = '' return lines def getloadavg(self): if self._popen is None: return None returncode = self._popen.poll() if returncode is not None: self.close(kill=False) return None try: lines = self._read_lines() except BrokenPipeError: self.close() return None for line in lines: line = line.rstrip() # Ignore the initial header: # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" if 'PDH-CSV' in line: continue # Ignore blank lines if not line: continue try: processor_queue_length = self._parse_line(line) except ValueError: print_warning("Failed to parse typeperf output: %a" % line) continue # We use an exponentially weighted moving average, imitating the # load calculation on Unix systems. # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average if self._load is not None: self._load = (self._load * LOAD_FACTOR_1 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) elif len(self._values) < NVALUE: self._values.append(processor_queue_length) else: self._load = sum(self._values) / len(self._values) return self._load