1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Helper class for power autotests requiring telemetry devices.""" 6 7import logging 8import time 9 10import numpy 11 12CUSTOM_START = 'PowerTelemetryLogger custom start.' 13CUSTOM_END = 'PowerTelemetryLogger custom end.' 14INTERPOLATION_RESOLUTION = 6 15 16 17class TelemetryUtilsError(Exception): 18 """Error class for issues using these utilities.""" 19 20 21def interpolate_missing_data(data, max_nan_ratio=None, max_sample_gap=None, 22 max_sample_time_gap=None, timeline=None): 23 """Interpolate missing power readings in data. 24 25 @param data: array of values 26 @min_nan_ratio: optional, float, max acceptable ratio of NaN to real values 27 @max_sample_gap: optional, int, max acceptable number of NaN in a row 28 @max_sample_time_gap: optional, float, max measurement gap in seconds 29 Note: supplying max_nan_time_gap requires timeline 30 @timeline: array of same size as |data| with timeline info for each sample 31 32 @returns: list, array |data| with missing values interpolated. 33 @raises: TelemetryUtilsError if 34 - the ratio of NaN is higher than |max_nan_ratio| (if supplied) 35 - no NaN gap is larger than |max_sample_gap| (if supplied) 36 - no NaN gap takes more time in |timeline| than 37 |max_sample_time_gap| (if supplied) 38 - all values in |data| are NaN. 39 """ 40 if max_sample_time_gap is not None and timeline is None: 41 # These are mutually required. 42 raise TelemetryUtilsError('Supplying max_sample_time_gap requires a ' 43 'timeline.') 44 data = numpy.array(data) 45 nan_data = numpy.isnan(data) 46 if max_nan_ratio: 47 # Validate the ratio if a ratio is supplied. 48 nan_ratio = float(sum(nan_data)) / len(data) 49 if nan_ratio > max_nan_ratio: 50 # There are too many errors in this source. 51 # Throw an error so the user has a chance to adjust their power 52 # collection setup. 53 raise TelemetryUtilsError('NaN ratio of %.02f ' 54 ' - Max is %.02f.' % (nan_ratio, 55 max_nan_ratio)) 56 if max_sample_gap is not None or max_sample_time_gap is not None: 57 # Flag to keep track whether the loop is in a measurement gap (NaN). 58 consecutive_nan_start = None 59 # Add a dummy at the end to make sure the iteration covers all real 60 # examples. 61 for i, isnan in enumerate(numpy.append(nan_data, False)): 62 if isnan and consecutive_nan_start is None: 63 consecutive_nan_start = i 64 if not isnan and consecutive_nan_start is not None: 65 consecutive_nans = i - consecutive_nan_start 66 if max_sample_gap and consecutive_nans >= max_sample_gap: 67 # Reject if there are too many consecutive failures. 68 raise TelemetryUtilsError('Too many consecutive NaN samples' 69 ': %d.' % consecutive_nans) 70 if max_sample_time_gap: 71 # Checks whether the first valid timestamp before the 72 # gap exists and whether the first valid timestamp after the 73 # gap exists. 74 if consecutive_nan_start == 0 or i == len(data): 75 # We cannot determine the gap timeline properly here 76 # as the gap either starts or ends with the time. 77 # Ignore for now. 78 continue 79 sample_time_gap = (timeline[i] - 80 timeline[consecutive_nan_start-1]) 81 if sample_time_gap > max_sample_time_gap: 82 raise TelemetryUtilsError('Excessively long sample gap ' 83 'of %.02fs. Longest ' 84 'permissible gap is %.02fs.' 85 % (sample_time_gap, 86 max_sample_time_gap)) 87 88 # Reset the flag for the next gap. 89 consecutive_nan_start = None 90 # At this point the data passed all validations required. 91 sample_idx = numpy.arange(len(data))[[~nan_data]] 92 sample_vals = data[[~nan_data]] 93 if not len(sample_idx): 94 raise TelemetryUtilsError('Data has no valid readings. Cannot ' 95 'interpolate.') 96 output = numpy.interp(range(len(data)), sample_idx, sample_vals) 97 return [round(x, INTERPOLATION_RESOLUTION) for x in output] 98 99def log_event_ts(message=None, timestamp=None, offset=0): 100 """Log the event and timestamp for parsing later. 101 102 @param message: description of the event. 103 @param timestamp: timestamp to for the event, if not provided, default to 104 current time. Local seconds since epoch. 105 @param offset: offset in seconds from the provided timestamp, or offset from 106 current time if timestamp is not provided. Can be positive or 107 negative. 108 """ 109 if not message: 110 return 111 if timestamp: 112 ts = timestamp + offset 113 else: 114 ts = time.time() + offset 115 logging.debug("%s %s", message, ts) 116 117def start_measurement(timestamp=None, offset=0): 118 """Mark the start of power telemetry measurement. 119 120 Optional. Use only once in the client side test that is wrapped in the 121 power measurement wrapper tests to help pinpoint exactly where power 122 telemetry data should start. PowerTelemetryLogger will trim off excess data 123 before this point. If not used, power telemetry data will start right before 124 the client side test. 125 @param timestamp: timestamp for the start of measurement, if not provided, 126 default to current time. Local seconds since epoch. 127 @param offset: offset in seconds from the provided timestamp, or offset from 128 current time if timestamp is not provided. Can be positive or 129 negative. 130 """ 131 log_event_ts(CUSTOM_START, timestamp, offset) 132 133def end_measurement(timestamp=None, offset=0): 134 """Mark the end of power telemetry measurement. 135 136 Optional. Use only once in the client side test that is wrapped in the 137 power measurement wrapper tests to help pinpoint exactly where power 138 telemetry data should end. PowerTelemetryLogger will trim off excess data 139 after this point. If not used, power telemetry data will end right after the 140 client side test. 141 @param timestamp: timestamp for the end of measurement, if not provided, 142 default to current time. Local seconds since epoch. 143 @param offset: offset in seconds from the provided timestamp, or offset from 144 current time if timestamp is not provided. Can be positive or 145 negative. 146 """ 147 log_event_ts(CUSTOM_END, timestamp, offset) 148