1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""This module provides utilities to do audio data analysis.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11import logging 12import numpy 13import operator 14from six.moves import range 15 16# Only peaks with coefficient greater than 0.01 of the first peak should be 17# considered. Note that this correspond to -40dB in the spectrum. 18DEFAULT_MIN_PEAK_RATIO = 0.01 19 20PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. 21 22# The minimum RMS value of meaningful audio data. 23MEANINGFUL_RMS_THRESHOLD = 0.001 24 25class RMSTooSmallError(Exception): 26 """Error when signal RMS is too small.""" 27 pass 28 29 30class EmptyDataError(Exception): 31 """Error when signal is empty.""" 32 33 34def normalize_signal(signal, saturate_value): 35 """Normalizes the signal with respect to the saturate value. 36 37 @param signal: A list for one-channel PCM data. 38 @param saturate_value: The maximum value that the PCM data might be. 39 40 @returns: A numpy array containing normalized signal. The normalized signal 41 has value -1 and 1 when it saturates. 42 43 """ 44 signal = numpy.array(signal) 45 return signal / float(saturate_value) 46 47 48def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 49 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 50 """Gets the dominant frequencies by spectral analysis. 51 52 @param signal: A list of numbers for one-channel PCM data. This should be 53 normalized to [-1, 1] so the function can check if signal RMS 54 is too small to be meaningful. 55 @param rate: Sampling rate. 56 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the 57 peaks other than the greatest one should be 58 considered. 59 This is to ignore peaks that are too small compared 60 to the first peak peak_0. 61 @param peak_window_size_hz: The window size in Hz to find the peaks. 62 The minimum differences between found peaks will 63 be half of this value. 64 65 @returns: A list of tuples: 66 [(peak_frequency_0, peak_coefficient_0), 67 (peak_frequency_1, peak_coefficient_1), 68 (peak_frequency_2, peak_coefficient_2), ...] 69 where the tuples are sorted by coefficients. 70 The last peak_coefficient will be no less than 71 peak_coefficient * min_peak_ratio. 72 If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)]. 73 74 """ 75 # Checks the signal is meaningful. 76 if len(signal) == 0: 77 raise EmptyDataError('Signal data is empty') 78 79 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 80 logging.debug('signal RMS = %s', signal_rms) 81 82 # If RMS is too small, set dominant frequency and coefficient to 0. 83 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 84 logging.warning( 85 'RMS %s is too small to be meaningful. Set frequency to 0.', 86 signal_rms) 87 return [(0, 0)] 88 89 logging.debug('Doing spectral analysis ...') 90 91 # First, pass signal through a window function to mitigate spectral leakage. 92 y_conv_w = signal * numpy.hanning(len(signal)) 93 94 length = len(y_conv_w) 95 96 # x_f is the frequency in Hz, y_f is the transformed coefficient. 97 x_f = _rfft_freq(length, rate) 98 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 99 100 # y_f is complex so consider its absolute value for magnitude. 101 abs_y_f = numpy.abs(y_f) 102 threshold = max(abs_y_f) * min_peak_ratio 103 104 # Suppresses all coefficients that are below threshold. 105 for i in range(len(abs_y_f)): 106 if abs_y_f[i] < threshold: 107 abs_y_f[i] = 0 108 109 # Gets the peak detection window size in indice. 110 # x_f[1] is the frequency difference per index. 111 peak_window_size = int(peak_window_size_hz / x_f[1]) 112 113 # Detects peaks. 114 peaks = peak_detection(abs_y_f, peak_window_size) 115 116 # Transform back the peak location from index to frequency. 117 results = [] 118 for index, value in peaks: 119 results.append((x_f[index], value)) 120 return results 121 122 123def _rfft_freq(length, rate): 124 """Gets the frequency at each index of real FFT. 125 126 @param length: The window length of FFT. 127 @param rate: Sampling rate. 128 129 @returns: A numpy array containing frequency corresponding to 130 numpy.fft.rfft result at each index. 131 132 """ 133 # The difference in Hz between each index. 134 val = rate / float(length) 135 # Only care half of frequencies for FFT on real signal. 136 result_length = length // 2 + 1 137 return numpy.linspace(0, (result_length - 1) * val, result_length) 138 139 140def peak_detection(array, window_size): 141 """Detects peaks in an array. 142 143 A point (i, array[i]) is a peak if array[i] is the maximum among 144 array[i - half_window_size] to array[i + half_window_size]. 145 If array[i - half_window_size] to array[i + half_window_size] are all equal, 146 then there is no peak in this window. 147 Note that we only consider peak with value greater than 0. 148 149 @param window_size: The window to detect peaks. 150 151 @returns: A list of tuples: 152 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 153 where the tuples are sorted by peak values. 154 155 """ 156 half_window_size = window_size // 2 157 length = len(array) 158 159 def mid_is_peak(array, mid, left, right): 160 """Checks if value at mid is the largest among left to right in array. 161 162 @param array: A list of numbers. 163 @param mid: The mid index. 164 @param left: The left index. 165 @param rigth: The right index. 166 167 @returns: A tuple (is_peak, next_candidate) 168 is_peak is True if array[index] is the maximum among numbers 169 in array between index [left, right] inclusively. 170 next_candidate is the index of next candidate for peak if 171 is_peak is False. It is the index of maximum value in 172 [mid + 1, right]. If is_peak is True, next_candidate is 173 right + 1. 174 175 """ 176 value_mid = array[mid] 177 is_peak = True 178 next_peak_candidate_index = None 179 180 # Check the left half window. 181 for index in range(left, mid): 182 if array[index] >= value_mid: 183 is_peak = False 184 break 185 186 # Mid is at the end of array. 187 if mid == right: 188 return is_peak, right + 1 189 190 # Check the right half window and also record next candidate. 191 # Favor the larger index for next_peak_candidate_index. 192 for index in range(right, mid, -1): 193 if (next_peak_candidate_index is None or 194 array[index] > array[next_peak_candidate_index]): 195 next_peak_candidate_index = index 196 197 if array[next_peak_candidate_index] >= value_mid: 198 is_peak = False 199 200 if is_peak: 201 next_peak_candidate_index = right + 1 202 203 return is_peak, next_peak_candidate_index 204 205 results = [] 206 mid = 0 207 next_candidate_idx = None 208 while mid < length: 209 left = max(0, mid - half_window_size) 210 right = min(length - 1, mid + half_window_size) 211 212 # Only consider value greater than 0. 213 if array[mid] == 0: 214 mid = mid + 1 215 continue; 216 217 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 218 219 if is_peak: 220 results.append((mid, array[mid])) 221 222 # Use the next candidate found in [mid + 1, right], or right + 1. 223 mid = next_candidate_idx 224 225 # Sort the peaks by values. 226 return sorted(results, key=lambda x: x[1], reverse=True) 227 228 229# The default pattern mathing threshold. By experiment, this threshold 230# can tolerate normal noise of 0.3 amplitude when sine wave signal 231# amplitude is 1. 232PATTERN_MATCHING_THRESHOLD = 0.85 233 234# The default block size of pattern matching. 235ANOMALY_DETECTION_BLOCK_SIZE = 120 236 237def anomaly_detection(signal, rate, freq, 238 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 239 threshold=PATTERN_MATCHING_THRESHOLD): 240 """Detects anomaly in a sine wave signal. 241 242 This method detects anomaly in a sine wave signal by matching 243 patterns of each block. 244 For each moving window of block in the test signal, checks if there 245 is any block in golden signal that is similar to this block of test signal. 246 If there is such a block in golden signal, then this block of test 247 signal is matched and there is no anomaly in this block of test signal. 248 If there is any block in test signal that is not matched, then this block 249 covers an anomaly. 250 The block of test signal starts from index 0, and proceeds in steps of 251 half block size. The overlapping of test signal blocks makes sure there must 252 be at least one block covering the transition from sine wave to anomaly. 253 254 @param signal: A 1-D array-like object for 1-channel PCM data. 255 @param rate: The sampling rate. 256 @param freq: The expected frequency of signal. 257 @param block_size: The block size in samples to detect anomaly. 258 @param threshold: The threshold of correlation index to be judge as matched. 259 260 @returns: A list containing detected anomaly time in seconds. 261 262 """ 263 if len(signal) == 0: 264 raise EmptyDataError('Signal data is empty') 265 266 golden_y = _generate_golden_pattern(rate, freq, block_size) 267 268 results = [] 269 270 for start in range(0, len(signal), block_size // 2): 271 end = start + block_size 272 test_signal = signal[start:end] 273 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 274 if not matched: 275 results.append(start) 276 277 results = [float(x) / rate for x in results] 278 279 return results 280 281 282def _generate_golden_pattern(rate, freq, block_size): 283 """Generates a golden pattern of certain frequency. 284 285 The golden pattern must cover all the possibilities of waveforms in a 286 block. So, we need a golden pattern covering 1 period + 1 block size, 287 such that the test block can start anywhere in a period, and extends 288 a block size. 289 290 |period |1 bk| 291 | | | 292 . . . . 293 . . . . 294 . . . 295 296 @param rate: The sampling rate. 297 @param freq: The frequency of golden pattern. 298 @param block_size: The block size in samples to detect anomaly. 299 300 @returns: A 1-D array for golden pattern. 301 302 """ 303 samples_in_a_period = int(rate / freq) + 1 304 samples_in_golden_pattern = samples_in_a_period + block_size 305 golden_x = numpy.linspace( 306 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, 307 samples_in_golden_pattern) 308 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 309 return golden_y 310 311 312def _moving_pattern_matching(golden_signal, test_signal, threshold): 313 """Checks if test_signal is similar to any block of golden_signal. 314 315 Compares test signal with each block of golden signal by correlation 316 index. If there is any block of golden signal that is similar to 317 test signal, then it is matched. 318 319 @param golden_signal: A 1-D array for golden signal. 320 @param test_signal: A 1-D array for test signal. 321 @param threshold: The threshold of correlation index to be judge as matched. 322 323 @returns: True if there is a match. False otherwise. 324 325 @raises: ValueError: if test signal is longer than golden signal. 326 327 """ 328 if len(golden_signal) < len(test_signal): 329 raise ValueError('Test signal is longer than golden signal') 330 331 block_length = len(test_signal) 332 number_of_movings = len(golden_signal) - block_length + 1 333 correlation_indices = [] 334 for moving_index in range(number_of_movings): 335 # Cuts one block of golden signal from start index. 336 # The block length is the same as test signal. 337 start = moving_index 338 end = start + block_length 339 golden_signal_block = golden_signal[start:end] 340 try: 341 correlation_index = _get_correlation_index( 342 golden_signal_block, test_signal) 343 except TestSignalNormTooSmallError: 344 logging.info('Caught one block of test signal that has no meaningful norm') 345 return False 346 correlation_indices.append(correlation_index) 347 348 # Checks if the maximum correlation index is high enough. 349 max_corr = max(correlation_indices) 350 if max_corr < threshold: 351 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 352 return False 353 return True 354 355 356class GoldenSignalNormTooSmallError(Exception): 357 """Exception when golden signal norm is too small.""" 358 pass 359 360 361class TestSignalNormTooSmallError(Exception): 362 """Exception when test signal norm is too small.""" 363 pass 364 365 366_MINIMUM_SIGNAL_NORM = 0.001 367 368def _get_correlation_index(golden_signal, test_signal): 369 """Computes correlation index of two signal of same length. 370 371 @param golden_signal: An 1-D array-like object. 372 @param test_signal: An 1-D array-like object. 373 374 @raises: ValueError: if two signal have different lengths. 375 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small 376 @raises: TestSignalNormTooSmallError: if test signal norm is too small. 377 378 @returns: The correlation index. 379 """ 380 if len(golden_signal) != len(test_signal): 381 raise ValueError( 382 'Only accepts signal of same length: %s, %s' % ( 383 len(golden_signal), len(test_signal))) 384 385 norm_golden = numpy.linalg.norm(golden_signal) 386 norm_test = numpy.linalg.norm(test_signal) 387 if norm_golden <= _MINIMUM_SIGNAL_NORM: 388 raise GoldenSignalNormTooSmallError( 389 'No meaningful data as norm is too small.') 390 if norm_test <= _MINIMUM_SIGNAL_NORM: 391 raise TestSignalNormTooSmallError( 392 'No meaningful data as norm is too small.') 393 394 # The 'valid' cross correlation result of two signals of same length will 395 # contain only one number. 396 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 397 return correlation / (norm_golden * norm_test) 398