#!/usr/bin/python2 import logging import numpy import os import unittest import common from autotest_lib.client.cros.audio import audio_analysis from autotest_lib.client.cros.audio import audio_data class SpectralAnalysisTest(unittest.TestCase): def setUp(self): """Uses the same seed to generate noise for each test.""" numpy.random.seed(0) def dummy_peak_detection(self, array, window_size): """Detects peaks in an array in simple way. A point (i, array[i]) is a peak if array[i] is the maximum among array[i - half_window_size] to array[i + half_window_size]. If array[i - half_window_size] to array[i + half_window_size] are all equal, then there is no peak in this window. @param window_size: The window to detect peaks. @returns: A list of tuples: [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] where the tuples are sorted by peak values. """ half_window_size = window_size / 2 length = len(array) def mid_is_peak(array, mid, left, right): """Checks if value at mid is the largest among left to right. @param array: A list of numbers. @param mid: The mid index. @param left: The left index. @param rigth: The right index. @returns: True if array[index] is the maximum among numbers in array between index [left, right] inclusively. """ value_mid = array[mid] for index in xrange(left, right + 1): if index == mid: continue if array[index] >= value_mid: return False return True results = [] for mid in xrange(length): left = max(0, mid - half_window_size) right = min(length - 1, mid + half_window_size) if mid_is_peak(array, mid, left, right): results.append((mid, array[mid])) # Sort the peaks by values. return sorted(results, key=lambda x: x[1], reverse=True) def testPeakDetection(self): array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] result = audio_analysis.peak_detection(array, 4) golden_answer = [(12, 5), (4, 4)] self.assertEqual(result, golden_answer) def testPeakDetectionLarge(self): array = numpy.random.uniform(0, 1, 1000000) window_size = 100 logging.debug('Test large array using dummy peak detection') dummy_answer = self.dummy_peak_detection(array, window_size) logging.debug('Test large array using improved peak detection') improved_answer = audio_analysis.peak_detection(array, window_size) logging.debug('Compare the result') self.assertEqual(dummy_answer, improved_answer) def testSpectralAnalysis(self): rate = 48000 length_in_secs = 0.5 freq_1 = 490.0 freq_2 = 60.0 coeff_1 = 1 coeff_2 = 0.3 samples = length_in_secs * rate noise = numpy.random.standard_normal(samples) * 0.005 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise results = audio_analysis.spectral_analysis(y, rate) # Results should contains # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant # frequency with coefficient 1, 60Hz is the second dominant frequency # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient # around 0.1. The k constant is resulted from window function. logging.debug('Results: %s', results) self.assertTrue(abs(results[0][0]-freq_1) < 1) self.assertTrue(abs(results[1][0]-freq_2) < 1) self.assertTrue( abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) def testSpectralAnalysisRealData(self): """This unittest checks the spectral analysis works on real data.""" file_path = os.path.join( os.path.dirname(__file__), 'test_data', '1k_2k.raw') binary = open(file_path, 'r').read() data = audio_data.AudioRawData(binary, 2, 'S32_LE') saturate_value = audio_data.get_maximum_value_from_sample_format( 'S32_LE') golden_frequency = [1000, 2000] for channel in [0, 1]: normalized_signal = audio_analysis.normalize_signal( data.channel_data[channel],saturate_value) spectral = audio_analysis.spectral_analysis( normalized_signal, 48000, 0.02) logging.debug('channel %s: %s', channel, spectral) self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5, 'Dominant frequency is not correct') def testNotMeaningfulData(self): """Checks that sepectral analysis handles un-meaningful data.""" rate = 48000 length_in_secs = 0.5 samples = length_in_secs * rate noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 noise = numpy.random.standard_normal(samples) * noise_amplitude results = audio_analysis.spectral_analysis(noise, rate) self.assertEqual([(0, 0)], results) def testEmptyData(self): """Checks that sepectral analysis rejects empty data.""" with self.assertRaises(audio_analysis.EmptyDataError): results = audio_analysis.spectral_analysis([], 100) class NormalizeTest(unittest.TestCase): def testNormalize(self): y = [1, 2, 3, 4, 5] normalized_y = audio_analysis.normalize_signal(y, 10) expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) for i in xrange(len(y)): self.assertEqual(expected[i], normalized_y[i]) class AnomalyTest(unittest.TestCase): def setUp(self): """Creates a test signal of sine wave.""" # Use the same seed for each test case. numpy.random.seed(0) self.block_size = 120 self.rate = 48000 self.freq = 440 length_in_secs = 0.25 self.samples = length_in_secs * self.rate x = numpy.linspace( 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples) self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) def add_noise(self): """Add noise to the test signal.""" noise_amplitude = 0.3 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude self.y = self.y + noise def insert_anomaly(self): """Inserts an anomaly to the test signal. The anomaly self.anomaly_samples should be created before calling this method. """ self.anomaly_start_secs = 0.1 self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate), self.anomaly_samples) def generate_skip_anomaly(self): """Skips a section of test signal.""" self.anomaly_start_secs = 0.1 self.anomaly_duration_secs = 0.005 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs anomaly_start_index = self.anomaly_start_secs * self.rate anomaly_append_index = anomaly_append_secs * self.rate self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:]) def create_constant_anomaly(self, amplitude): """Creates an anomaly of constant samples. @param amplitude: The amplitude of the constant samples. """ self.anomaly_duration_secs = 0.005 self.anomaly_samples = ( [amplitude] * int(self.anomaly_duration_secs * self.rate)) def run_analysis(self): """Runs the anomaly detection.""" self.results = audio_analysis.anomaly_detection( self.y, self.rate, self.freq, self.block_size) logging.debug('Results: %s', self.results) def check_no_anomaly(self): """Verifies that there is no anomaly in detection result.""" self.run_analysis() self.assertFalse(self.results) def check_anomaly(self): """Verifies that there is anomaly in detection result. The detection result should contain anomaly time stamps that are close to where anomaly was inserted. There can be multiple anomalies since the detection depends on the block size. """ self.run_analysis() self.assertTrue(self.results) # Anomaly can be detected as long as the detection window of block size # overlaps with anomaly. expected_detected_range_secs = ( self.anomaly_start_secs - float(self.block_size) / self.rate, self.anomaly_start_secs + self.anomaly_duration_secs) for detected_secs in self.results: self.assertTrue(detected_secs <= expected_detected_range_secs[1]) self.assertTrue(detected_secs >= expected_detected_range_secs[0] ) def testGoodSignal(self): """Sine wave signal with no noise or anomaly.""" self.check_no_anomaly() def testGoodSignalNoise(self): """Sine wave signal with noise.""" self.add_noise() self.check_no_anomaly() def testZeroAnomaly(self): """Sine wave signal with no noise but with anomaly. This test case simulates underrun in digital data where there will be one block of samples with 0 amplitude. """ self.create_constant_anomaly(0) self.insert_anomaly() self.check_anomaly() def testZeroAnomalyNoise(self): """Sine wave signal with noise and anomaly. This test case simulates underrun in analog data where there will be one block of samples with amplitudes close to 0. """ self.create_constant_anomaly(0) self.insert_anomaly() self.add_noise() self.check_anomaly() def testLowConstantAnomaly(self): """Sine wave signal with low constant anomaly. The anomaly is one block of constant values. """ self.create_constant_anomaly(0.05) self.insert_anomaly() self.check_anomaly() def testLowConstantAnomalyNoise(self): """Sine wave signal with low constant anomaly and noise. The anomaly is one block of constant values. """ self.create_constant_anomaly(0.05) self.insert_anomaly() self.add_noise() self.check_anomaly() def testHighConstantAnomaly(self): """Sine wave signal with high constant anomaly. The anomaly is one block of constant values. """ self.create_constant_anomaly(2) self.insert_anomaly() self.check_anomaly() def testHighConstantAnomalyNoise(self): """Sine wave signal with high constant anomaly and noise. The anomaly is one block of constant values. """ self.create_constant_anomaly(2) self.insert_anomaly() self.add_noise() self.check_anomaly() def testSkippedAnomaly(self): """Sine wave signal with skipped anomaly. The anomaly simulates the symptom where a block is skipped. """ self.generate_skip_anomaly() self.check_anomaly() def testSkippedAnomalyNoise(self): """Sine wave signal with skipped anomaly with noise. The anomaly simulates the symptom where a block is skipped. """ self.generate_skip_anomaly() self.add_noise() self.check_anomaly() def testEmptyData(self): """Checks that anomaly detection rejects empty data.""" self.y = [] with self.assertRaises(audio_analysis.EmptyDataError): self.check_anomaly() if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') unittest.main()