1#!/usr/bin/python 2import logging 3import numpy 4import os 5import unittest 6 7import common 8from autotest_lib.client.cros.audio import audio_analysis 9from autotest_lib.client.cros.audio import audio_data 10 11class SpectralAnalysisTest(unittest.TestCase): 12 def setUp(self): 13 """Uses the same seed to generate noise for each test.""" 14 numpy.random.seed(0) 15 16 17 def dummy_peak_detection(self, array, window_size): 18 """Detects peaks in an array in simple way. 19 20 A point (i, array[i]) is a peak if array[i] is the maximum among 21 array[i - half_window_size] to array[i + half_window_size]. 22 If array[i - half_window_size] to array[i + half_window_size] are all 23 equal, then there is no peak in this window. 24 25 @param window_size: The window to detect peaks. 26 27 @returns: A list of tuples: 28 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), 29 ...] 30 where the tuples are sorted by peak values. 31 32 """ 33 half_window_size = window_size / 2 34 length = len(array) 35 36 def mid_is_peak(array, mid, left, right): 37 """Checks if value at mid is the largest among left to right. 38 39 @param array: A list of numbers. 40 @param mid: The mid index. 41 @param left: The left index. 42 @param rigth: The right index. 43 44 @returns: True if array[index] is the maximum among numbers in array 45 between index [left, right] inclusively. 46 47 """ 48 value_mid = array[mid] 49 for index in xrange(left, right + 1): 50 if index == mid: 51 continue 52 if array[index] >= value_mid: 53 return False 54 return True 55 56 results = [] 57 for mid in xrange(length): 58 left = max(0, mid - half_window_size) 59 right = min(length - 1, mid + half_window_size) 60 if mid_is_peak(array, mid, left, right): 61 results.append((mid, array[mid])) 62 63 # Sort the peaks by values. 64 return sorted(results, key=lambda x: x[1], reverse=True) 65 66 67 def testPeakDetection(self): 68 array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] 69 result = audio_analysis.peak_detection(array, 4) 70 golden_answer = [(12, 5), (4, 4)] 71 self.assertEqual(result, golden_answer) 72 73 74 def testPeakDetectionLarge(self): 75 array = numpy.random.uniform(0, 1, 1000000) 76 window_size = 100 77 logging.debug('Test large array using dummy peak detection') 78 dummy_answer = self.dummy_peak_detection(array, window_size) 79 logging.debug('Test large array using improved peak detection') 80 improved_answer = audio_analysis.peak_detection(array, window_size) 81 logging.debug('Compare the result') 82 self.assertEqual(dummy_answer, improved_answer) 83 84 85 def testSpectralAnalysis(self): 86 rate = 48000 87 length_in_secs = 0.5 88 freq_1 = 490.0 89 freq_2 = 60.0 90 coeff_1 = 1 91 coeff_2 = 0.3 92 samples = length_in_secs * rate 93 noise = numpy.random.standard_normal(samples) * 0.005 94 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) 95 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + 96 coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise 97 results = audio_analysis.spectral_analysis(y, rate) 98 # Results should contains 99 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant 100 # frequency with coefficient 1, 60Hz is the second dominant frequency 101 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient 102 # around 0.1. The k constant is resulted from window function. 103 logging.debug('Results: %s', results) 104 self.assertTrue(abs(results[0][0]-freq_1) < 1) 105 self.assertTrue(abs(results[1][0]-freq_2) < 1) 106 self.assertTrue( 107 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) 108 109 110 def testSpectralAnalysisRealData(self): 111 """This unittest checks the spectral analysis works on real data.""" 112 file_path = os.path.join( 113 os.path.dirname(__file__), 'test_data', '1k_2k.raw') 114 binary = open(file_path, 'r').read() 115 data = audio_data.AudioRawData(binary, 2, 'S32_LE') 116 saturate_value = audio_data.get_maximum_value_from_sample_format( 117 'S32_LE') 118 golden_frequency = [1000, 2000] 119 for channel in [0, 1]: 120 normalized_signal = audio_analysis.normalize_signal( 121 data.channel_data[channel],saturate_value) 122 spectral = audio_analysis.spectral_analysis( 123 normalized_signal, 48000, 0.02) 124 logging.debug('channel %s: %s', channel, spectral) 125 self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5, 126 'Dominant frequency is not correct') 127 128 129 def testNotMeaningfulData(self): 130 """Checks that sepectral analysis handles un-meaningful data.""" 131 rate = 48000 132 length_in_secs = 0.5 133 samples = length_in_secs * rate 134 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 135 noise = numpy.random.standard_normal(samples) * noise_amplitude 136 results = audio_analysis.spectral_analysis(noise, rate) 137 self.assertEqual([(0, 0)], results) 138 139 140 def testEmptyData(self): 141 """Checks that sepectral analysis rejects empty data.""" 142 with self.assertRaises(audio_analysis.EmptyDataError): 143 results = audio_analysis.spectral_analysis([], 100) 144 145 146class NormalizeTest(unittest.TestCase): 147 def testNormalize(self): 148 y = [1, 2, 3, 4, 5] 149 normalized_y = audio_analysis.normalize_signal(y, 10) 150 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) 151 for i in xrange(len(y)): 152 self.assertEqual(expected[i], normalized_y[i]) 153 154 155class AnomalyTest(unittest.TestCase): 156 def setUp(self): 157 """Creates a test signal of sine wave.""" 158 # Use the same seed for each test case. 159 numpy.random.seed(0) 160 161 self.block_size = 120 162 self.rate = 48000 163 self.freq = 440 164 length_in_secs = 0.25 165 self.samples = length_in_secs * self.rate 166 x = numpy.linspace( 167 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples) 168 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) 169 170 171 def add_noise(self): 172 """Add noise to the test signal.""" 173 noise_amplitude = 0.3 174 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude 175 self.y = self.y + noise 176 177 178 def insert_anomaly(self): 179 """Inserts an anomaly to the test signal. 180 181 The anomaly self.anomaly_samples should be created before calling this 182 method. 183 184 """ 185 self.anomaly_start_secs = 0.1 186 self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate), 187 self.anomaly_samples) 188 189 190 def generate_skip_anomaly(self): 191 """Skips a section of test signal.""" 192 self.anomaly_start_secs = 0.1 193 self.anomaly_duration_secs = 0.005 194 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs 195 anomaly_start_index = self.anomaly_start_secs * self.rate 196 anomaly_append_index = anomaly_append_secs * self.rate 197 self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:]) 198 199 200 def create_constant_anomaly(self, amplitude): 201 """Creates an anomaly of constant samples. 202 203 @param amplitude: The amplitude of the constant samples. 204 205 """ 206 self.anomaly_duration_secs = 0.005 207 self.anomaly_samples = ( 208 [amplitude] * int(self.anomaly_duration_secs * self.rate)) 209 210 211 def run_analysis(self): 212 """Runs the anomaly detection.""" 213 self.results = audio_analysis.anomaly_detection( 214 self.y, self.rate, self.freq, self.block_size) 215 logging.debug('Results: %s', self.results) 216 217 218 def check_no_anomaly(self): 219 """Verifies that there is no anomaly in detection result.""" 220 self.run_analysis() 221 self.assertFalse(self.results) 222 223 224 def check_anomaly(self): 225 """Verifies that there is anomaly in detection result. 226 227 The detection result should contain anomaly time stamps that are 228 close to where anomaly was inserted. There can be multiple anomalies 229 since the detection depends on the block size. 230 231 """ 232 self.run_analysis() 233 self.assertTrue(self.results) 234 # Anomaly can be detected as long as the detection window of block size 235 # overlaps with anomaly. 236 expected_detected_range_secs = ( 237 self.anomaly_start_secs - float(self.block_size) / self.rate, 238 self.anomaly_start_secs + self.anomaly_duration_secs) 239 for detected_secs in self.results: 240 self.assertTrue(detected_secs <= expected_detected_range_secs[1]) 241 self.assertTrue(detected_secs >= expected_detected_range_secs[0] ) 242 243 244 def testGoodSignal(self): 245 """Sine wave signal with no noise or anomaly.""" 246 self.check_no_anomaly() 247 248 249 def testGoodSignalNoise(self): 250 """Sine wave signal with noise.""" 251 self.add_noise() 252 self.check_no_anomaly() 253 254 255 def testZeroAnomaly(self): 256 """Sine wave signal with no noise but with anomaly. 257 258 This test case simulates underrun in digital data where there will be 259 one block of samples with 0 amplitude. 260 261 """ 262 self.create_constant_anomaly(0) 263 self.insert_anomaly() 264 self.check_anomaly() 265 266 267 def testZeroAnomalyNoise(self): 268 """Sine wave signal with noise and anomaly. 269 270 This test case simulates underrun in analog data where there will be 271 one block of samples with amplitudes close to 0. 272 273 """ 274 self.create_constant_anomaly(0) 275 self.insert_anomaly() 276 self.add_noise() 277 self.check_anomaly() 278 279 280 def testLowConstantAnomaly(self): 281 """Sine wave signal with low constant anomaly. 282 283 The anomaly is one block of constant values. 284 285 """ 286 self.create_constant_anomaly(0.05) 287 self.insert_anomaly() 288 self.check_anomaly() 289 290 291 def testLowConstantAnomalyNoise(self): 292 """Sine wave signal with low constant anomaly and noise. 293 294 The anomaly is one block of constant values. 295 296 """ 297 self.create_constant_anomaly(0.05) 298 self.insert_anomaly() 299 self.add_noise() 300 self.check_anomaly() 301 302 303 def testHighConstantAnomaly(self): 304 """Sine wave signal with high constant anomaly. 305 306 The anomaly is one block of constant values. 307 308 """ 309 self.create_constant_anomaly(2) 310 self.insert_anomaly() 311 self.check_anomaly() 312 313 314 def testHighConstantAnomalyNoise(self): 315 """Sine wave signal with high constant anomaly and noise. 316 317 The anomaly is one block of constant values. 318 319 """ 320 self.create_constant_anomaly(2) 321 self.insert_anomaly() 322 self.add_noise() 323 self.check_anomaly() 324 325 326 def testSkippedAnomaly(self): 327 """Sine wave signal with skipped anomaly. 328 329 The anomaly simulates the symptom where a block is skipped. 330 331 """ 332 self.generate_skip_anomaly() 333 self.check_anomaly() 334 335 336 def testSkippedAnomalyNoise(self): 337 """Sine wave signal with skipped anomaly with noise. 338 339 The anomaly simulates the symptom where a block is skipped. 340 341 """ 342 self.generate_skip_anomaly() 343 self.add_noise() 344 self.check_anomaly() 345 346 347 def testEmptyData(self): 348 """Checks that anomaly detection rejects empty data.""" 349 self.y = [] 350 with self.assertRaises(audio_analysis.EmptyDataError): 351 self.check_anomaly() 352 353 354if __name__ == '__main__': 355 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 356 unittest.main() 357