• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import _overlapped
2import _thread
3import _winapi
4import math
5import struct
6import winreg
7
8
9# Seconds per measurement
10SAMPLING_INTERVAL = 1
11# Exponential damping factor to compute exponentially weighted moving average
12# on 1 minute (60 seconds)
13LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
14# Initialize the load using the arithmetic mean of the first NVALUE values
15# of the Processor Queue Length
16NVALUE = 5
17
18
19class WindowsLoadTracker():
20    """
21    This class asynchronously reads the performance counters to calculate
22    the system load on Windows.  A "raw" thread is used here to prevent
23    interference with the test suite's cases for the threading module.
24    """
25
26    def __init__(self):
27        # make __del__ not fail if pre-flight test fails
28        self._running = None
29        self._stopped = None
30
31        # Pre-flight test for access to the performance data;
32        # `PermissionError` will be raised if not allowed
33        winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
34
35        self._values = []
36        self._load = None
37        self._running = _overlapped.CreateEvent(None, True, False, None)
38        self._stopped = _overlapped.CreateEvent(None, True, False, None)
39
40        _thread.start_new_thread(self._update_load, (), {})
41
42    def _update_load(self,
43                    # localize module access to prevent shutdown errors
44                     _wait=_winapi.WaitForSingleObject,
45                     _signal=_overlapped.SetEvent):
46        # run until signaled to stop
47        while _wait(self._running, 1000):
48            self._calculate_load()
49        # notify stopped
50        _signal(self._stopped)
51
52    def _calculate_load(self,
53                        # localize module access to prevent shutdown errors
54                        _query=winreg.QueryValueEx,
55                        _hkey=winreg.HKEY_PERFORMANCE_DATA,
56                        _unpack=struct.unpack_from):
57        # get the 'System' object
58        data, _ = _query(_hkey, '2')
59        # PERF_DATA_BLOCK {
60        #   WCHAR Signature[4]      8 +
61        #   DWOWD LittleEndian      4 +
62        #   DWORD Version           4 +
63        #   DWORD Revision          4 +
64        #   DWORD TotalByteLength   4 +
65        #   DWORD HeaderLength      = 24 byte offset
66        #   ...
67        # }
68        obj_start, = _unpack('L', data, 24)
69        # PERF_OBJECT_TYPE {
70        #   DWORD TotalByteLength
71        #   DWORD DefinitionLength
72        #   DWORD HeaderLength
73        #   ...
74        # }
75        data_start, defn_start = _unpack('4xLL', data, obj_start)
76        data_base = obj_start + data_start
77        defn_base = obj_start + defn_start
78        # find the 'Processor Queue Length' counter (index=44)
79        while defn_base < data_base:
80            # PERF_COUNTER_DEFINITION {
81            #   DWORD ByteLength
82            #   DWORD CounterNameTitleIndex
83            #   ... [7 DWORDs/28 bytes]
84            #   DWORD CounterOffset
85            # }
86            size, idx, offset = _unpack('LL28xL', data, defn_base)
87            defn_base += size
88            if idx == 44:
89                counter_offset = data_base + offset
90                # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
91                processor_queue_length, = _unpack('L', data, counter_offset)
92                break
93        else:
94            return
95
96        # We use an exponentially weighted moving average, imitating the
97        # load calculation on Unix systems.
98        # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
99        # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
100        if self._load is not None:
101            self._load = (self._load * LOAD_FACTOR_1
102                            + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
103        elif len(self._values) < NVALUE:
104            self._values.append(processor_queue_length)
105        else:
106            self._load = sum(self._values) / len(self._values)
107
108    def close(self, kill=True):
109        self.__del__()
110        return
111
112    def __del__(self,
113                # localize module access to prevent shutdown errors
114                _wait=_winapi.WaitForSingleObject,
115                _close=_winapi.CloseHandle,
116                _signal=_overlapped.SetEvent):
117        if self._running is not None:
118            # tell the update thread to quit
119            _signal(self._running)
120            # wait for the update thread to signal done
121            _wait(self._stopped, -1)
122            # cleanup events
123            _close(self._running)
124            _close(self._stopped)
125            self._running = self._stopped = None
126
127    def getloadavg(self):
128        return self._load
129