# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This module provides utilities to do audio data analysis.""" import logging import numpy import operator # Only peaks with coefficient greater than 0.01 of the first peak should be # considered. Note that this correspond to -40dB in the spectrum. DEFAULT_MIN_PEAK_RATIO = 0.01 PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. # The minimum RMS value of meaningful audio data. MEANINGFUL_RMS_THRESHOLD = 0.001 class RMSTooSmallError(Exception): """Error when signal RMS is too small.""" pass class EmptyDataError(Exception): """Error when signal is empty.""" def normalize_signal(signal, saturate_value): """Normalizes the signal with respect to the saturate value. @param signal: A list for one-channel PCM data. @param saturate_value: The maximum value that the PCM data might be. @returns: A numpy array containing normalized signal. The normalized signal has value -1 and 1 when it saturates. """ signal = numpy.array(signal) return signal / float(saturate_value) def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): """Gets the dominant frequencies by spectral analysis. @param signal: A list of numbers for one-channel PCM data. This should be normalized to [-1, 1] so the function can check if signal RMS is too small to be meaningful. @param rate: Sampling rate. @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the peaks other than the greatest one should be considered. This is to ignore peaks that are too small compared to the first peak peak_0. @param peak_window_size_hz: The window size in Hz to find the peaks. The minimum differences between found peaks will be half of this value. @returns: A list of tuples: [(peak_frequency_0, peak_coefficient_0), (peak_frequency_1, peak_coefficient_1), (peak_frequency_2, peak_coefficient_2), ...] where the tuples are sorted by coefficients. The last peak_coefficient will be no less than peak_coefficient * min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)]. """ # Checks the signal is meaningful. if len(signal) == 0: raise EmptyDataError('Signal data is empty') signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) logging.debug('signal RMS = %s', signal_rms) # If RMS is too small, set dominant frequency and coefficient to 0. if signal_rms < MEANINGFUL_RMS_THRESHOLD: logging.warning( 'RMS %s is too small to be meaningful. Set frequency to 0.', signal_rms) return [(0, 0)] logging.debug('Doing spectral analysis ...') # First, pass signal through a window function to mitigate spectral leakage. y_conv_w = signal * numpy.hanning(len(signal)) length = len(y_conv_w) # x_f is the frequency in Hz, y_f is the transformed coefficient. x_f = _rfft_freq(length, rate) y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) # y_f is complex so consider its absolute value for magnitude. abs_y_f = numpy.abs(y_f) threshold = max(abs_y_f) * min_peak_ratio # Suppresses all coefficients that are below threshold. for i in xrange(len(abs_y_f)): if abs_y_f[i] < threshold: abs_y_f[i] = 0 # Gets the peak detection window size in indice. # x_f[1] is the frequency difference per index. peak_window_size = int(peak_window_size_hz / x_f[1]) # Detects peaks. peaks = peak_detection(abs_y_f, peak_window_size) # Transform back the peak location from index to frequency. results = [] for index, value in peaks: results.append((x_f[index], value)) return results def _rfft_freq(length, rate): """Gets the frequency at each index of real FFT. @param length: The window length of FFT. @param rate: Sampling rate. @returns: A numpy array containing frequency corresponding to numpy.fft.rfft result at each index. """ # The difference in Hz between each index. val = rate / float(length) # Only care half of frequencies for FFT on real signal. result_length = length // 2 + 1 return numpy.linspace(0, (result_length - 1) * val, result_length) def peak_detection(array, window_size): """Detects peaks in an array. A point (i, array[i]) is a peak if array[i] is the maximum among array[i - half_window_size] to array[i + half_window_size]. If array[i - half_window_size] to array[i + half_window_size] are all equal, then there is no peak in this window. Note that we only consider peak with value greater than 0. @param window_size: The window to detect peaks. @returns: A list of tuples: [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] where the tuples are sorted by peak values. """ half_window_size = window_size / 2 length = len(array) def mid_is_peak(array, mid, left, right): """Checks if value at mid is the largest among left to right in array. @param array: A list of numbers. @param mid: The mid index. @param left: The left index. @param rigth: The right index. @returns: A tuple (is_peak, next_candidate) is_peak is True if array[index] is the maximum among numbers in array between index [left, right] inclusively. next_candidate is the index of next candidate for peak if is_peak is False. It is the index of maximum value in [mid + 1, right]. If is_peak is True, next_candidate is right + 1. """ value_mid = array[mid] is_peak = True next_peak_candidate_index = None # Check the left half window. for index in xrange(left, mid): if array[index] >= value_mid: is_peak = False break # Mid is at the end of array. if mid == right: return is_peak, right + 1 # Check the right half window and also record next candidate. # Favor the larger index for next_peak_candidate_index. for index in xrange(right, mid, -1): if (next_peak_candidate_index is None or array[index] > array[next_peak_candidate_index]): next_peak_candidate_index = index if array[next_peak_candidate_index] >= value_mid: is_peak = False if is_peak: next_peak_candidate_index = right + 1 return is_peak, next_peak_candidate_index results = [] mid = 0 next_candidate_idx = None while mid < length: left = max(0, mid - half_window_size) right = min(length - 1, mid + half_window_size) # Only consider value greater than 0. if array[mid] == 0: mid = mid + 1 continue; is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) if is_peak: results.append((mid, array[mid])) # Use the next candidate found in [mid + 1, right], or right + 1. mid = next_candidate_idx # Sort the peaks by values. return sorted(results, key=lambda x: x[1], reverse=True) # The default pattern mathing threshold. By experiment, this threshold # can tolerate normal noise of 0.3 amplitude when sine wave signal # amplitude is 1. PATTERN_MATCHING_THRESHOLD = 0.85 # The default block size of pattern matching. ANOMALY_DETECTION_BLOCK_SIZE = 120 def anomaly_detection(signal, rate, freq, block_size=ANOMALY_DETECTION_BLOCK_SIZE, threshold=PATTERN_MATCHING_THRESHOLD): """Detects anomaly in a sine wave signal. This method detects anomaly in a sine wave signal by matching patterns of each block. For each moving window of block in the test signal, checks if there is any block in golden signal that is similar to this block of test signal. If there is such a block in golden signal, then this block of test signal is matched and there is no anomaly in this block of test signal. If there is any block in test signal that is not matched, then this block covers an anomaly. The block of test signal starts from index 0, and proceeds in steps of half block size. The overlapping of test signal blocks makes sure there must be at least one block covering the transition from sine wave to anomaly. @param signal: A 1-D array-like object for 1-channel PCM data. @param rate: The sampling rate. @param freq: The expected frequency of signal. @param block_size: The block size in samples to detect anomaly. @param threshold: The threshold of correlation index to be judge as matched. @returns: A list containing detected anomaly time in seconds. """ if len(signal) == 0: raise EmptyDataError('Signal data is empty') golden_y = _generate_golden_pattern(rate, freq, block_size) results = [] for start in xrange(0, len(signal), block_size / 2): end = start + block_size test_signal = signal[start:end] matched = _moving_pattern_matching(golden_y, test_signal, threshold) if not matched: results.append(start) results = [float(x) / rate for x in results] return results def _generate_golden_pattern(rate, freq, block_size): """Generates a golden pattern of certain frequency. The golden pattern must cover all the possibilities of waveforms in a block. So, we need a golden pattern covering 1 period + 1 block size, such that the test block can start anywhere in a period, and extends a block size. |period |1 bk| | | | . . . . . . . . . . . @param rate: The sampling rate. @param freq: The frequency of golden pattern. @param block_size: The block size in samples to detect anomaly. @returns: A 1-D array for golden pattern. """ samples_in_a_period = int(rate / freq) + 1 samples_in_golden_pattern = samples_in_a_period + block_size golden_x = numpy.linspace( 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, samples_in_golden_pattern) golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) return golden_y def _moving_pattern_matching(golden_signal, test_signal, threshold): """Checks if test_signal is similar to any block of golden_signal. Compares test signal with each block of golden signal by correlation index. If there is any block of golden signal that is similar to test signal, then it is matched. @param golden_signal: A 1-D array for golden signal. @param test_signal: A 1-D array for test signal. @param threshold: The threshold of correlation index to be judge as matched. @returns: True if there is a match. False otherwise. @raises: ValueError: if test signal is longer than golden signal. """ if len(golden_signal) < len(test_signal): raise ValueError('Test signal is longer than golden signal') block_length = len(test_signal) number_of_movings = len(golden_signal) - block_length + 1 correlation_indices = [] for moving_index in xrange(number_of_movings): # Cuts one block of golden signal from start index. # The block length is the same as test signal. start = moving_index end = start + block_length golden_signal_block = golden_signal[start:end] try: correlation_index = _get_correlation_index( golden_signal_block, test_signal) except TestSignalNormTooSmallError: logging.info('Caught one block of test signal that has no meaningful norm') return False correlation_indices.append(correlation_index) # Checks if the maximum correlation index is high enough. max_corr = max(correlation_indices) if max_corr < threshold: logging.debug('Got one unmatched block with max_corr: %s', max_corr) return False return True class GoldenSignalNormTooSmallError(Exception): """Exception when golden signal norm is too small.""" pass class TestSignalNormTooSmallError(Exception): """Exception when test signal norm is too small.""" pass _MINIMUM_SIGNAL_NORM = 0.001 def _get_correlation_index(golden_signal, test_signal): """Computes correlation index of two signal of same length. @param golden_signal: An 1-D array-like object. @param test_signal: An 1-D array-like object. @raises: ValueError: if two signal have different lengths. @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small @raises: TestSignalNormTooSmallError: if test signal norm is too small. @returns: The correlation index. """ if len(golden_signal) != len(test_signal): raise ValueError( 'Only accepts signal of same length: %s, %s' % ( len(golden_signal), len(test_signal))) norm_golden = numpy.linalg.norm(golden_signal) norm_test = numpy.linalg.norm(test_signal) if norm_golden <= _MINIMUM_SIGNAL_NORM: raise GoldenSignalNormTooSmallError( 'No meaningful data as norm is too small.') if norm_test <= _MINIMUM_SIGNAL_NORM: raise TestSignalNormTooSmallError( 'No meaningful data as norm is too small.') # The 'valid' cross correlation result of two signals of same length will # contain only one number. correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] return correlation / (norm_golden * norm_test)