• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import _winapi
2import math
3import msvcrt
4import os
5import subprocess
6import uuid
7import winreg
8from test import support
9from test.libregrtest.utils import print_warning
10
11
12# Max size of asynchronous reads
13BUFSIZE = 8192
14# Seconds per measurement
15SAMPLING_INTERVAL = 1
16# Exponential damping factor to compute exponentially weighted moving average
17# on 1 minute (60 seconds)
18LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
19# Initialize the load using the arithmetic mean of the first NVALUE values
20# of the Processor Queue Length
21NVALUE = 5
22# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
23# of typeperf are registered
24COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
25                        r"\Perflib\CurrentLanguage")
26
27
28class WindowsLoadTracker():
29    """
30    This class asynchronously interacts with the `typeperf` command to read
31    the system load on Windows. Multiprocessing and threads can't be used
32    here because they interfere with the test suite's cases for those
33    modules.
34    """
35
36    def __init__(self):
37        self._values = []
38        self._load = None
39        self._buffer = ''
40        self._popen = None
41        self.start()
42
43    def start(self):
44        # Create a named pipe which allows for asynchronous IO in Windows
45        pipe_name =  r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
46
47        open_mode =  _winapi.PIPE_ACCESS_INBOUND
48        open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
49        open_mode |= _winapi.FILE_FLAG_OVERLAPPED
50
51        # This is the read end of the pipe, where we will be grabbing output
52        self.pipe = _winapi.CreateNamedPipe(
53            pipe_name, open_mode, _winapi.PIPE_WAIT,
54            1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
55        )
56        # The write end of the pipe which is passed to the created process
57        pipe_write_end = _winapi.CreateFile(
58            pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
59            _winapi.OPEN_EXISTING, 0, _winapi.NULL
60        )
61        # Open up the handle as a python file object so we can pass it to
62        # subprocess
63        command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
64
65        # Connect to the read end of the pipe in overlap/async mode
66        overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
67        overlap.GetOverlappedResult(True)
68
69        # Spawn off the load monitor
70        counter_name = self._get_counter_name()
71        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
72        self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
73
74        # Close our copy of the write end of the pipe
75        os.close(command_stdout)
76
77    def _get_counter_name(self):
78        # accessing the registry to get the counter localization name
79        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
80            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
81
82        # Convert [key1, value1, key2, value2, ...] list
83        # to {key1: value1, key2: value2, ...} dict
84        counters = iter(counters)
85        counters_dict = dict(zip(counters, counters))
86
87        # System counter has key '2' and Processor Queue Length has key '44'
88        system = counters_dict['2']
89        process_queue_length = counters_dict['44']
90        return f'"\\{system}\\{process_queue_length}"'
91
92    def close(self, kill=True):
93        if self._popen is None:
94            return
95
96        self._load = None
97
98        if kill:
99            self._popen.kill()
100        self._popen.wait()
101        self._popen = None
102
103    def __del__(self):
104        self.close()
105
106    def _parse_line(self, line):
107        # typeperf outputs in a CSV format like this:
108        # "07/19/2018 01:32:26.605","3.000000"
109        # (date, process queue length)
110        tokens = line.split(',')
111        if len(tokens) != 2:
112            raise ValueError
113
114        value = tokens[1]
115        if not value.startswith('"') or not value.endswith('"'):
116            raise ValueError
117        value = value[1:-1]
118        return float(value)
119
120    def _read_lines(self):
121        overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
122        bytes_read, res = overlapped.GetOverlappedResult(False)
123        if res != 0:
124            return ()
125
126        output = overlapped.getbuffer()
127        output = output.decode('oem', 'replace')
128        output = self._buffer + output
129        lines = output.splitlines(True)
130
131        # bpo-36670: typeperf only writes a newline *before* writing a value,
132        # not after. Sometimes, the written line in incomplete (ex: only
133        # timestamp, without the process queue length). Only pass the last line
134        # to the parser if it's a valid value, otherwise store it in
135        # self._buffer.
136        try:
137            self._parse_line(lines[-1])
138        except ValueError:
139            self._buffer = lines.pop(-1)
140        else:
141            self._buffer = ''
142
143        return lines
144
145    def getloadavg(self):
146        if self._popen is None:
147            return None
148
149        returncode = self._popen.poll()
150        if returncode is not None:
151            self.close(kill=False)
152            return None
153
154        try:
155            lines = self._read_lines()
156        except BrokenPipeError:
157            self.close()
158            return None
159
160        for line in lines:
161            line = line.rstrip()
162
163            # Ignore the initial header:
164            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
165            if 'PDH-CSV' in line:
166                continue
167
168            # Ignore blank lines
169            if not line:
170                continue
171
172            try:
173                processor_queue_length = self._parse_line(line)
174            except ValueError:
175                print_warning("Failed to parse typeperf output: %a" % line)
176                continue
177
178            # We use an exponentially weighted moving average, imitating the
179            # load calculation on Unix systems.
180            # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
181            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
182            if self._load is not None:
183                self._load = (self._load * LOAD_FACTOR_1
184                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
185            elif len(self._values) < NVALUE:
186                self._values.append(processor_queue_length)
187            else:
188                self._load = sum(self._values) / len(self._values)
189
190        return self._load
191