1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides utilities to do audio data analysis.""" 6 7import logging 8import numpy 9import operator 10 11# Only peaks with coefficient greater than 0.01 of the first peak should be 12# considered. Note that this correspond to -40dB in the spectrum. 13DEFAULT_MIN_PEAK_RATIO = 0.01 14 15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. 16 17# The minimum RMS value of meaningful audio data. 18MEANINGFUL_RMS_THRESHOLD = 0.001 19 20class RMSTooSmallError(Exception): 21 """Error when signal RMS is too small.""" 22 pass 23 24 25class EmptyDataError(Exception): 26 """Error when signal is empty.""" 27 28 29def normalize_signal(signal, saturate_value): 30 """Normalizes the signal with respect to the saturate value. 31 32 @param signal: A list for one-channel PCM data. 33 @param saturate_value: The maximum value that the PCM data might be. 34 35 @returns: A numpy array containing normalized signal. The normalized signal 36 has value -1 and 1 when it saturates. 37 38 """ 39 signal = numpy.array(signal) 40 return signal / float(saturate_value) 41 42 43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 44 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 45 """Gets the dominant frequencies by spectral analysis. 46 47 @param signal: A list of numbers for one-channel PCM data. This should be 48 normalized to [-1, 1] so the function can check if signal RMS 49 is too small to be meaningful. 50 @param rate: Sampling rate. 51 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the 52 peaks other than the greatest one should be 53 considered. 54 This is to ignore peaks that are too small compared 55 to the first peak peak_0. 56 @param peak_window_size_hz: The window size in Hz to find the peaks. 57 The minimum differences between found peaks will 58 be half of this value. 59 60 @returns: A list of tuples: 61 [(peak_frequency_0, peak_coefficient_0), 62 (peak_frequency_1, peak_coefficient_1), 63 (peak_frequency_2, peak_coefficient_2), ...] 64 where the tuples are sorted by coefficients. 65 The last peak_coefficient will be no less than 66 peak_coefficient * min_peak_ratio. 67 If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)]. 68 69 """ 70 # Checks the signal is meaningful. 71 if len(signal) == 0: 72 raise EmptyDataError('Signal data is empty') 73 74 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 75 logging.debug('signal RMS = %s', signal_rms) 76 77 # If RMS is too small, set dominant frequency and coefficient to 0. 78 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 79 logging.warning( 80 'RMS %s is too small to be meaningful. Set frequency to 0.', 81 signal_rms) 82 return [(0, 0)] 83 84 logging.debug('Doing spectral analysis ...') 85 86 # First, pass signal through a window function to mitigate spectral leakage. 87 y_conv_w = signal * numpy.hanning(len(signal)) 88 89 length = len(y_conv_w) 90 91 # x_f is the frequency in Hz, y_f is the transformed coefficient. 92 x_f = _rfft_freq(length, rate) 93 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 94 95 # y_f is complex so consider its absolute value for magnitude. 96 abs_y_f = numpy.abs(y_f) 97 threshold = max(abs_y_f) * min_peak_ratio 98 99 # Suppresses all coefficients that are below threshold. 100 for i in xrange(len(abs_y_f)): 101 if abs_y_f[i] < threshold: 102 abs_y_f[i] = 0 103 104 # Gets the peak detection window size in indice. 105 # x_f[1] is the frequency difference per index. 106 peak_window_size = int(peak_window_size_hz / x_f[1]) 107 108 # Detects peaks. 109 peaks = peak_detection(abs_y_f, peak_window_size) 110 111 # Transform back the peak location from index to frequency. 112 results = [] 113 for index, value in peaks: 114 results.append((x_f[index], value)) 115 return results 116 117 118def _rfft_freq(length, rate): 119 """Gets the frequency at each index of real FFT. 120 121 @param length: The window length of FFT. 122 @param rate: Sampling rate. 123 124 @returns: A numpy array containing frequency corresponding to 125 numpy.fft.rfft result at each index. 126 127 """ 128 # The difference in Hz between each index. 129 val = rate / float(length) 130 # Only care half of frequencies for FFT on real signal. 131 result_length = length // 2 + 1 132 return numpy.linspace(0, (result_length - 1) * val, result_length) 133 134 135def peak_detection(array, window_size): 136 """Detects peaks in an array. 137 138 A point (i, array[i]) is a peak if array[i] is the maximum among 139 array[i - half_window_size] to array[i + half_window_size]. 140 If array[i - half_window_size] to array[i + half_window_size] are all equal, 141 then there is no peak in this window. 142 Note that we only consider peak with value greater than 0. 143 144 @param window_size: The window to detect peaks. 145 146 @returns: A list of tuples: 147 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 148 where the tuples are sorted by peak values. 149 150 """ 151 half_window_size = window_size / 2 152 length = len(array) 153 154 def mid_is_peak(array, mid, left, right): 155 """Checks if value at mid is the largest among left to right in array. 156 157 @param array: A list of numbers. 158 @param mid: The mid index. 159 @param left: The left index. 160 @param rigth: The right index. 161 162 @returns: A tuple (is_peak, next_candidate) 163 is_peak is True if array[index] is the maximum among numbers 164 in array between index [left, right] inclusively. 165 next_candidate is the index of next candidate for peak if 166 is_peak is False. It is the index of maximum value in 167 [mid + 1, right]. If is_peak is True, next_candidate is 168 right + 1. 169 170 """ 171 value_mid = array[mid] 172 is_peak = True 173 next_peak_candidate_index = None 174 175 # Check the left half window. 176 for index in xrange(left, mid): 177 if array[index] >= value_mid: 178 is_peak = False 179 break 180 181 # Mid is at the end of array. 182 if mid == right: 183 return is_peak, right + 1 184 185 # Check the right half window and also record next candidate. 186 # Favor the larger index for next_peak_candidate_index. 187 for index in xrange(right, mid, -1): 188 if (next_peak_candidate_index is None or 189 array[index] > array[next_peak_candidate_index]): 190 next_peak_candidate_index = index 191 192 if array[next_peak_candidate_index] >= value_mid: 193 is_peak = False 194 195 if is_peak: 196 next_peak_candidate_index = right + 1 197 198 return is_peak, next_peak_candidate_index 199 200 results = [] 201 mid = 0 202 next_candidate_idx = None 203 while mid < length: 204 left = max(0, mid - half_window_size) 205 right = min(length - 1, mid + half_window_size) 206 207 # Only consider value greater than 0. 208 if array[mid] == 0: 209 mid = mid + 1 210 continue; 211 212 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 213 214 if is_peak: 215 results.append((mid, array[mid])) 216 217 # Use the next candidate found in [mid + 1, right], or right + 1. 218 mid = next_candidate_idx 219 220 # Sort the peaks by values. 221 return sorted(results, key=lambda x: x[1], reverse=True) 222 223 224# The default pattern mathing threshold. By experiment, this threshold 225# can tolerate normal noise of 0.3 amplitude when sine wave signal 226# amplitude is 1. 227PATTERN_MATCHING_THRESHOLD = 0.85 228 229# The default block size of pattern matching. 230ANOMALY_DETECTION_BLOCK_SIZE = 120 231 232def anomaly_detection(signal, rate, freq, 233 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 234 threshold=PATTERN_MATCHING_THRESHOLD): 235 """Detects anomaly in a sine wave signal. 236 237 This method detects anomaly in a sine wave signal by matching 238 patterns of each block. 239 For each moving window of block in the test signal, checks if there 240 is any block in golden signal that is similar to this block of test signal. 241 If there is such a block in golden signal, then this block of test 242 signal is matched and there is no anomaly in this block of test signal. 243 If there is any block in test signal that is not matched, then this block 244 covers an anomaly. 245 The block of test signal starts from index 0, and proceeds in steps of 246 half block size. The overlapping of test signal blocks makes sure there must 247 be at least one block covering the transition from sine wave to anomaly. 248 249 @param signal: A 1-D array-like object for 1-channel PCM data. 250 @param rate: The sampling rate. 251 @param freq: The expected frequency of signal. 252 @param block_size: The block size in samples to detect anomaly. 253 @param threshold: The threshold of correlation index to be judge as matched. 254 255 @returns: A list containing detected anomaly time in seconds. 256 257 """ 258 if len(signal) == 0: 259 raise EmptyDataError('Signal data is empty') 260 261 golden_y = _generate_golden_pattern(rate, freq, block_size) 262 263 results = [] 264 265 for start in xrange(0, len(signal), block_size / 2): 266 end = start + block_size 267 test_signal = signal[start:end] 268 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 269 if not matched: 270 results.append(start) 271 272 results = [float(x) / rate for x in results] 273 274 return results 275 276 277def _generate_golden_pattern(rate, freq, block_size): 278 """Generates a golden pattern of certain frequency. 279 280 The golden pattern must cover all the possibilities of waveforms in a 281 block. So, we need a golden pattern covering 1 period + 1 block size, 282 such that the test block can start anywhere in a period, and extends 283 a block size. 284 285 |period |1 bk| 286 | | | 287 . . . . 288 . . . . 289 . . . 290 291 @param rate: The sampling rate. 292 @param freq: The frequency of golden pattern. 293 @param block_size: The block size in samples to detect anomaly. 294 295 @returns: A 1-D array for golden pattern. 296 297 """ 298 samples_in_a_period = int(rate / freq) + 1 299 samples_in_golden_pattern = samples_in_a_period + block_size 300 golden_x = numpy.linspace( 301 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, 302 samples_in_golden_pattern) 303 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 304 return golden_y 305 306 307def _moving_pattern_matching(golden_signal, test_signal, threshold): 308 """Checks if test_signal is similar to any block of golden_signal. 309 310 Compares test signal with each block of golden signal by correlation 311 index. If there is any block of golden signal that is similar to 312 test signal, then it is matched. 313 314 @param golden_signal: A 1-D array for golden signal. 315 @param test_signal: A 1-D array for test signal. 316 @param threshold: The threshold of correlation index to be judge as matched. 317 318 @returns: True if there is a match. False otherwise. 319 320 @raises: ValueError: if test signal is longer than golden signal. 321 322 """ 323 if len(golden_signal) < len(test_signal): 324 raise ValueError('Test signal is longer than golden signal') 325 326 block_length = len(test_signal) 327 number_of_movings = len(golden_signal) - block_length + 1 328 correlation_indices = [] 329 for moving_index in xrange(number_of_movings): 330 # Cuts one block of golden signal from start index. 331 # The block length is the same as test signal. 332 start = moving_index 333 end = start + block_length 334 golden_signal_block = golden_signal[start:end] 335 try: 336 correlation_index = _get_correlation_index( 337 golden_signal_block, test_signal) 338 except TestSignalNormTooSmallError: 339 logging.info('Caught one block of test signal that has no meaningful norm') 340 return False 341 correlation_indices.append(correlation_index) 342 343 # Checks if the maximum correlation index is high enough. 344 max_corr = max(correlation_indices) 345 if max_corr < threshold: 346 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 347 return False 348 return True 349 350 351class GoldenSignalNormTooSmallError(Exception): 352 """Exception when golden signal norm is too small.""" 353 pass 354 355 356class TestSignalNormTooSmallError(Exception): 357 """Exception when test signal norm is too small.""" 358 pass 359 360 361_MINIMUM_SIGNAL_NORM = 0.001 362 363def _get_correlation_index(golden_signal, test_signal): 364 """Computes correlation index of two signal of same length. 365 366 @param golden_signal: An 1-D array-like object. 367 @param test_signal: An 1-D array-like object. 368 369 @raises: ValueError: if two signal have different lengths. 370 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small 371 @raises: TestSignalNormTooSmallError: if test signal norm is too small. 372 373 @returns: The correlation index. 374 """ 375 if len(golden_signal) != len(test_signal): 376 raise ValueError( 377 'Only accepts signal of same length: %s, %s' % ( 378 len(golden_signal), len(test_signal))) 379 380 norm_golden = numpy.linalg.norm(golden_signal) 381 norm_test = numpy.linalg.norm(test_signal) 382 if norm_golden <= _MINIMUM_SIGNAL_NORM: 383 raise GoldenSignalNormTooSmallError( 384 'No meaningful data as norm is too small.') 385 if norm_test <= _MINIMUM_SIGNAL_NORM: 386 raise TestSignalNormTooSmallError( 387 'No meaningful data as norm is too small.') 388 389 # The 'valid' cross correlation result of two signals of same length will 390 # contain only one number. 391 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 392 return correlation / (norm_golden * norm_test) 393