• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import _winapi
2import math
3import msvcrt
4import os
5import subprocess
6import uuid
7import winreg
8from test.support import os_helper
9from test.libregrtest.utils import print_warning
10
11
12# Max size of asynchronous reads
13BUFSIZE = 8192
14# Seconds per measurement
15SAMPLING_INTERVAL = 1
16# Exponential damping factor to compute exponentially weighted moving average
17# on 1 minute (60 seconds)
18LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
19# Initialize the load using the arithmetic mean of the first NVALUE values
20# of the Processor Queue Length
21NVALUE = 5
22# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
23# of typeperf are registered
24COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
25                        r"\Perflib\CurrentLanguage")
26
27
28class WindowsLoadTracker():
29    """
30    This class asynchronously interacts with the `typeperf` command to read
31    the system load on Windows. Multiprocessing and threads can't be used
32    here because they interfere with the test suite's cases for those
33    modules.
34    """
35
36    def __init__(self):
37        self._values = []
38        self._load = None
39        self._buffer = ''
40        self._popen = None
41        self.start()
42
43    def start(self):
44        # Create a named pipe which allows for asynchronous IO in Windows
45        pipe_name =  r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
46
47        open_mode =  _winapi.PIPE_ACCESS_INBOUND
48        open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
49        open_mode |= _winapi.FILE_FLAG_OVERLAPPED
50
51        # This is the read end of the pipe, where we will be grabbing output
52        self.pipe = _winapi.CreateNamedPipe(
53            pipe_name, open_mode, _winapi.PIPE_WAIT,
54            1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
55        )
56        # The write end of the pipe which is passed to the created process
57        pipe_write_end = _winapi.CreateFile(
58            pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
59            _winapi.OPEN_EXISTING, 0, _winapi.NULL
60        )
61        # Open up the handle as a python file object so we can pass it to
62        # subprocess
63        command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
64
65        # Connect to the read end of the pipe in overlap/async mode
66        overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
67        overlap.GetOverlappedResult(True)
68
69        # Spawn off the load monitor
70        counter_name = self._get_counter_name()
71        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
72        self._popen = subprocess.Popen(' '.join(command),
73                                       stdout=command_stdout,
74                                       cwd=os_helper.SAVEDCWD)
75
76        # Close our copy of the write end of the pipe
77        os.close(command_stdout)
78
79    def _get_counter_name(self):
80        # accessing the registry to get the counter localization name
81        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
82            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
83
84        # Convert [key1, value1, key2, value2, ...] list
85        # to {key1: value1, key2: value2, ...} dict
86        counters = iter(counters)
87        counters_dict = dict(zip(counters, counters))
88
89        # System counter has key '2' and Processor Queue Length has key '44'
90        system = counters_dict['2']
91        process_queue_length = counters_dict['44']
92        return f'"\\{system}\\{process_queue_length}"'
93
94    def close(self, kill=True):
95        if self._popen is None:
96            return
97
98        self._load = None
99
100        if kill:
101            self._popen.kill()
102        self._popen.wait()
103        self._popen = None
104
105    def __del__(self):
106        self.close()
107
108    def _parse_line(self, line):
109        # typeperf outputs in a CSV format like this:
110        # "07/19/2018 01:32:26.605","3.000000"
111        # (date, process queue length)
112        tokens = line.split(',')
113        if len(tokens) != 2:
114            raise ValueError
115
116        value = tokens[1]
117        if not value.startswith('"') or not value.endswith('"'):
118            raise ValueError
119        value = value[1:-1]
120        return float(value)
121
122    def _read_lines(self):
123        overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
124        bytes_read, res = overlapped.GetOverlappedResult(False)
125        if res != 0:
126            return ()
127
128        output = overlapped.getbuffer()
129        output = output.decode('oem', 'replace')
130        output = self._buffer + output
131        lines = output.splitlines(True)
132
133        # bpo-36670: typeperf only writes a newline *before* writing a value,
134        # not after. Sometimes, the written line in incomplete (ex: only
135        # timestamp, without the process queue length). Only pass the last line
136        # to the parser if it's a valid value, otherwise store it in
137        # self._buffer.
138        try:
139            self._parse_line(lines[-1])
140        except ValueError:
141            self._buffer = lines.pop(-1)
142        else:
143            self._buffer = ''
144
145        return lines
146
147    def getloadavg(self):
148        if self._popen is None:
149            return None
150
151        returncode = self._popen.poll()
152        if returncode is not None:
153            self.close(kill=False)
154            return None
155
156        try:
157            lines = self._read_lines()
158        except BrokenPipeError:
159            self.close()
160            return None
161
162        for line in lines:
163            line = line.rstrip()
164
165            # Ignore the initial header:
166            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
167            if 'PDH-CSV' in line:
168                continue
169
170            # Ignore blank lines
171            if not line:
172                continue
173
174            try:
175                processor_queue_length = self._parse_line(line)
176            except ValueError:
177                print_warning("Failed to parse typeperf output: %a" % line)
178                continue
179
180            # We use an exponentially weighted moving average, imitating the
181            # load calculation on Unix systems.
182            # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
183            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
184            if self._load is not None:
185                self._load = (self._load * LOAD_FACTOR_1
186                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
187            elif len(self._values) < NVALUE:
188                self._values.append(processor_queue_length)
189            else:
190                self._load = sum(self._values) / len(self._values)
191
192        return self._load
193