1#!/usr/bin/env python3 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""This module provides utilities to do audio data analysis.""" 17 18import logging 19import numpy 20import soundfile 21from scipy.signal import blackmanharris 22from scipy.signal import iirnotch 23from scipy.signal import lfilter 24 25# The default block size of pattern matching. 26ANOMALY_DETECTION_BLOCK_SIZE = 120 27 28# Only peaks with coefficient greater than 0.01 of the first peak should be 29# considered. Note that this correspond to -40dB in the spectrum. 30DEFAULT_MIN_PEAK_RATIO = 0.01 31 32# The minimum RMS value of meaningful audio data. 33MEANINGFUL_RMS_THRESHOLD = 0.001 34 35# The minimal signal norm value. 36_MINIMUM_SIGNAL_NORM = 0.001 37 38# The default pattern mathing threshold. By experiment, this threshold 39# can tolerate normal noise of 0.3 amplitude when sine wave signal 40# amplitude is 1. 41PATTERN_MATCHING_THRESHOLD = 0.85 42 43# The default number of samples within the analysis step size that the 44# difference between two anomaly time values can be to be grouped together. 45ANOMALY_GROUPING_TOLERANCE = 1.0 46 47# Window size for peak detection. 48PEAK_WINDOW_SIZE_HZ = 20 49 50 51class RMSTooSmallError(Exception): 52 """Error when signal RMS is too small.""" 53 pass 54 55 56class EmptyDataError(Exception): 57 """Error when signal is empty.""" 58 pass 59 60 61def normalize_signal(signal, saturate_value): 62 """Normalizes the signal with respect to the saturate value. 63 64 Args: 65 signal: A list for one-channel PCM data. 66 saturate_value: The maximum value that the PCM data might be. 67 68 Returns: 69 A numpy array containing normalized signal. The normalized signal has 70 value -1 and 1 when it saturates. 71 72 """ 73 signal = numpy.array(signal) 74 return signal / float(saturate_value) 75 76 77def spectral_analysis(signal, 78 rate, 79 min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 80 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 81 """Gets the dominant frequencies by spectral analysis. 82 83 Args: 84 signal: A list of numbers for one-channel PCM data. This should be 85 normalized to [-1, 1] so the function can check if signal RMS 86 is too small to be meaningful. 87 rate: Sampling rate in samples per second. Example inputs: 44100, 88 48000 89 min_peak_ratio: The minimum peak_i/peak_0 ratio such that the 90 peaks other than the greatest one should be 91 considered. 92 This is to ignore peaks that are too small compared 93 to the first peak peak_0. 94 peak_window_size_hz: The window size in Hz to find the peaks. 95 The minimum differences between found peaks will 96 be half of this value. 97 98 Returns: 99 A list of tuples: 100 [(peak_frequency_0, peak_coefficient_0), 101 (peak_frequency_1, peak_coefficient_1), 102 (peak_frequency_2, peak_coefficient_2), ...] 103 where the tuples are sorted by coefficients. The last 104 peak_coefficient will be no less than peak_coefficient * 105 min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD, 106 returns [(0, 0)]. 107 108 """ 109 # Checks the signal is meaningful. 110 if len(signal) == 0: 111 raise EmptyDataError('Signal data is empty') 112 113 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 114 logging.debug('signal RMS = %s', signal_rms) 115 116 # If RMS is too small, set dominant frequency and coefficient to 0. 117 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 118 logging.warning( 119 'RMS %s is too small to be meaningful. Set frequency to 0.', 120 signal_rms) 121 return [(0, 0)] 122 123 logging.debug('Doing spectral analysis ...') 124 125 # First, pass signal through a window function to mitigate spectral leakage. 126 y_conv_w = signal * numpy.hanning(len(signal)) 127 128 length = len(y_conv_w) 129 130 # x_f is the frequency in Hz, y_f is the transformed coefficient. 131 x_f = _rfft_freq(length, rate) 132 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 133 134 # y_f is complex so consider its absolute value for magnitude. 135 abs_y_f = numpy.abs(y_f) 136 threshold = max(abs_y_f) * min_peak_ratio 137 138 # Suppresses all coefficients that are below threshold. 139 for i in range(len(abs_y_f)): 140 if abs_y_f[i] < threshold: 141 abs_y_f[i] = 0 142 143 # Gets the peak detection window size in indice. 144 # x_f[1] is the frequency difference per index. 145 peak_window_size = int(peak_window_size_hz / x_f[1]) 146 147 # Detects peaks. 148 peaks = peak_detection(abs_y_f, peak_window_size) 149 150 # Transform back the peak location from index to frequency. 151 results = [] 152 for index, value in peaks: 153 results.append((x_f[int(index)], value)) 154 return results 155 156 157def _rfft_freq(length, rate): 158 """Gets the frequency at each index of real FFT. 159 160 Args: 161 length: The window length of FFT. 162 rate: Sampling rate in samples per second. Example inputs: 44100, 163 48000 164 165 Returns: 166 A numpy array containing frequency corresponding to numpy.fft.rfft 167 result at each index. 168 169 """ 170 # The difference in Hz between each index. 171 val = rate / float(length) 172 # Only care half of frequencies for FFT on real signal. 173 result_length = length // 2 + 1 174 return numpy.linspace(0, (result_length - 1) * val, result_length) 175 176 177def peak_detection(array, window_size): 178 """Detects peaks in an array. 179 180 A point (i, array[i]) is a peak if array[i] is the maximum among 181 array[i - half_window_size] to array[i + half_window_size]. 182 If array[i - half_window_size] to array[i + half_window_size] are all equal, 183 then there is no peak in this window. 184 Note that we only consider peak with value greater than 0. 185 186 Args: 187 array: The input array to detect peaks in. Array is a list of 188 absolute values of the magnitude of transformed coefficient. 189 190 window_size: The window to detect peaks. 191 192 Returns: 193 A list of tuples: 194 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 195 where the tuples are sorted by peak values. 196 197 """ 198 half_window_size = window_size / 2 199 length = len(array) 200 201 def mid_is_peak(array, mid, left, right): 202 """Checks if value at mid is the largest among left to right in array. 203 204 Args: 205 array: A list of numbers. 206 mid: The mid index. 207 left: The left index. 208 rigth: The right index. 209 210 Returns: 211 A tuple (is_peak, next_candidate) 212 is_peak is True if array[index] is the maximum among numbers 213 in array between index [left, right] inclusively. 214 next_candidate is the index of next candidate for peak if 215 is_peak is False. It is the index of maximum value in 216 [mid + 1, right]. If is_peak is True, next_candidate is 217 right + 1. 218 219 """ 220 value_mid = array[int(mid)] 221 is_peak = True 222 next_peak_candidate_index = None 223 224 # Check the left half window. 225 for index in range(int(left), int(mid)): 226 if array[index] >= value_mid: 227 is_peak = False 228 break 229 230 # Mid is at the end of array. 231 if mid == right: 232 return is_peak, right + 1 233 234 # Check the right half window and also record next candidate. 235 # Favor the larger index for next_peak_candidate_index. 236 for index in range(int(right), int(mid), -1): 237 if (next_peak_candidate_index is None or 238 array[index] > array[next_peak_candidate_index]): 239 next_peak_candidate_index = index 240 241 if array[next_peak_candidate_index] >= value_mid: 242 is_peak = False 243 244 if is_peak: 245 next_peak_candidate_index = right + 1 246 247 return is_peak, next_peak_candidate_index 248 249 results = [] 250 mid = 0 251 next_candidate_idx = None 252 while mid < length: 253 left = max(0, mid - half_window_size) 254 right = min(length - 1, mid + half_window_size) 255 256 # Only consider value greater than 0. 257 if array[int(mid)] == 0: 258 mid = mid + 1 259 continue 260 261 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 262 263 if is_peak: 264 results.append((mid, array[int(mid)])) 265 266 # Use the next candidate found in [mid + 1, right], or right + 1. 267 mid = next_candidate_idx 268 269 # Sort the peaks by values. 270 return sorted(results, key=lambda x: x[1], reverse=True) 271 272 273def anomaly_detection(signal, 274 rate, 275 freq, 276 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 277 threshold=PATTERN_MATCHING_THRESHOLD): 278 """Detects anomaly in a sine wave signal. 279 280 This method detects anomaly in a sine wave signal by matching 281 patterns of each block. 282 For each moving window of block in the test signal, checks if there 283 is any block in golden signal that is similar to this block of test signal. 284 If there is such a block in golden signal, then this block of test 285 signal is matched and there is no anomaly in this block of test signal. 286 If there is any block in test signal that is not matched, then this block 287 covers an anomaly. 288 The block of test signal starts from index 0, and proceeds in steps of 289 half block size. The overlapping of test signal blocks makes sure there must 290 be at least one block covering the transition from sine wave to anomaly. 291 292 Args: 293 signal: A 1-D array-like object for 1-channel PCM data. 294 rate: Sampling rate in samples per second. Example inputs: 44100, 295 48000 296 freq: The expected frequency of signal. 297 block_size: The block size in samples to detect anomaly. 298 threshold: The threshold of correlation index to be judge as matched. 299 300 Returns: 301 A list containing time markers in seconds that have an anomaly within 302 block_size samples. 303 304 """ 305 if len(signal) == 0: 306 raise EmptyDataError('Signal data is empty') 307 308 golden_y = _generate_golden_pattern(rate, freq, block_size) 309 310 results = [] 311 312 for start in range(0, len(signal), int(block_size / 2)): 313 end = start + block_size 314 test_signal = signal[start:end] 315 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 316 if not matched: 317 results.append(start) 318 319 results = [float(x) / rate for x in results] 320 321 return results 322 323 324def get_anomaly_durations(signal, 325 rate, 326 freq, 327 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 328 threshold=PATTERN_MATCHING_THRESHOLD, 329 tolerance=ANOMALY_GROUPING_TOLERANCE): 330 """Detect anomalies in a sine wav and return their start and end times. 331 332 Run anomaly_detection function and parse resulting array of time values into 333 discrete anomalies defined by a start and end time tuple. Time values are 334 judged to be part of the same anomaly if they lie within a given tolerance 335 of half the block_size number of samples of each other. 336 337 Args: 338 signal: A 1-D array-like object for 1-channel PCM data. 339 rate (int): Sampling rate in samples per second. 340 Example inputs: 44100, 48000 341 freq (int): The expected frequency of signal. 342 block_size (int): The block size in samples to detect anomaly. 343 threshold (float): The threshold of correlation index to be judge as 344 matched. 345 tolerance (float): The number of samples greater than block_size / 2 346 that the sample distance between two anomaly time values can be and 347 still be grouped as the same anomaly. 348 Returns: 349 bounds (list): a list of (start, end) tuples where start and end are the 350 boundaries in seconds of the detected anomaly. 351 """ 352 bounds = [] 353 anoms = anomaly_detection(signal, rate, freq, block_size, threshold) 354 if len(anoms) == 0: 355 return bounds 356 end = anoms[0] 357 start = anoms[0] 358 for i in range(len(anoms)-1): 359 end = anoms[i] 360 sample_diff = abs(anoms[i] - anoms[i+1]) * rate 361 # We require a tolerance because sample_diff may be slightly off due to 362 # float rounding errors in Python. 363 if sample_diff > block_size / 2 + tolerance: 364 bounds.append((start, end)) 365 start = anoms[i+1] 366 bounds.append((start, end)) 367 return bounds 368 369 370def _generate_golden_pattern(rate, freq, block_size): 371 """Generates a golden pattern of certain frequency. 372 373 The golden pattern must cover all the possibilities of waveforms in a 374 block. So, we need a golden pattern covering 1 period + 1 block size, 375 such that the test block can start anywhere in a period, and extends 376 a block size. 377 378 |period |1 bk| 379 | | | 380 . . . . 381 . . . . 382 . . . 383 384 Args: 385 rate: Sampling rate in samples per second. Example inputs: 44100, 386 48000 387 freq: The frequency of golden pattern. 388 block_size: The block size in samples to detect anomaly. 389 390 Returns: 391 A 1-D array for golden pattern. 392 393 """ 394 samples_in_a_period = int(rate / freq) + 1 395 samples_in_golden_pattern = samples_in_a_period + block_size 396 golden_x = numpy.linspace(0.0, (samples_in_golden_pattern - 1) * 1.0 / 397 rate, samples_in_golden_pattern) 398 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 399 return golden_y 400 401 402def _moving_pattern_matching(golden_signal, test_signal, threshold): 403 """Checks if test_signal is similar to any block of golden_signal. 404 405 Compares test signal with each block of golden signal by correlation 406 index. If there is any block of golden signal that is similar to 407 test signal, then it is matched. 408 409 Args: 410 golden_signal: A 1-D array for golden signal. 411 test_signal: A 1-D array for test signal. 412 threshold: The threshold of correlation index to be judge as matched. 413 414 Returns: 415 True if there is a match. False otherwise. 416 417 ValueError: if test signal is longer than golden signal. 418 419 """ 420 if len(golden_signal) < len(test_signal): 421 raise ValueError('Test signal is longer than golden signal') 422 423 block_length = len(test_signal) 424 number_of_movings = len(golden_signal) - block_length + 1 425 correlation_indices = [] 426 for moving_index in range(number_of_movings): 427 # Cuts one block of golden signal from start index. 428 # The block length is the same as test signal. 429 start = moving_index 430 end = start + block_length 431 golden_signal_block = golden_signal[start:end] 432 try: 433 correlation_index = _get_correlation_index(golden_signal_block, 434 test_signal) 435 except TestSignalNormTooSmallError: 436 logging.info( 437 'Caught one block of test signal that has no meaningful norm') 438 return False 439 correlation_indices.append(correlation_index) 440 441 # Checks if the maximum correlation index is high enough. 442 max_corr = max(correlation_indices) 443 if max_corr < threshold: 444 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 445 return False 446 return True 447 448 449class GoldenSignalNormTooSmallError(Exception): 450 """Exception when golden signal norm is too small.""" 451 pass 452 453 454class TestSignalNormTooSmallError(Exception): 455 """Exception when test signal norm is too small.""" 456 pass 457 458 459def _get_correlation_index(golden_signal, test_signal): 460 """Computes correlation index of two signal of same length. 461 462 Args: 463 golden_signal: An 1-D array-like object. 464 test_signal: An 1-D array-like object. 465 466 Raises: 467 ValueError: if two signal have different lengths. 468 GoldenSignalNormTooSmallError: if golden signal norm is too small 469 TestSignalNormTooSmallError: if test signal norm is too small. 470 471 Returns: 472 The correlation index. 473 """ 474 if len(golden_signal) != len(test_signal): 475 raise ValueError('Only accepts signal of same length: %s, %s' % 476 (len(golden_signal), len(test_signal))) 477 478 norm_golden = numpy.linalg.norm(golden_signal) 479 norm_test = numpy.linalg.norm(test_signal) 480 if norm_golden <= _MINIMUM_SIGNAL_NORM: 481 raise GoldenSignalNormTooSmallError( 482 'No meaningful data as norm is too small.') 483 if norm_test <= _MINIMUM_SIGNAL_NORM: 484 raise TestSignalNormTooSmallError( 485 'No meaningful data as norm is too small.') 486 487 # The 'valid' cross correlation result of two signals of same length will 488 # contain only one number. 489 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 490 return correlation / (norm_golden * norm_test) 491 492 493def fundamental_freq(signal, rate): 494 """Return fundamental frequency of signal by finding max in freq domain. 495 """ 496 dft = numpy.fft.rfft(signal) 497 fund_freq = rate * (numpy.argmax(numpy.abs(dft)) / len(signal)) 498 return fund_freq 499 500 501def rms(array): 502 """Return the root mean square of array. 503 """ 504 return numpy.sqrt(numpy.mean(numpy.absolute(array)**2)) 505 506 507def THDN(signal, rate, q, freq): 508 """Measure the THD+N for a signal and return the results. 509 Subtract mean to center signal around 0, remove fundamental frequency from 510 dft using notch filter and transform back into signal to get noise. Compute 511 ratio of RMS of noise signal to RMS of entire signal. 512 513 Args: 514 signal: array of values representing an audio signal. 515 rate: sample rate in Hz of the signal. 516 q: quality factor for the notch filter. 517 freq: fundamental frequency of the signal. All other frequencies 518 are noise. If not specified, will be calculated using FFT. 519 Returns: 520 THDN: THD+N ratio calculated from the ratio of RMS of pure harmonics 521 and noise signal to RMS of original signal. 522 """ 523 # Normalize and window signal. 524 signal -= numpy.mean(signal) 525 windowed = signal * blackmanharris(len(signal)) 526 # Find fundamental frequency to remove if not specified. 527 freq = freq or fundamental_freq(windowed, rate) 528 # Create notch filter to isolate noise. 529 w0 = freq / (rate / 2.0) 530 b, a = iirnotch(w0, q) 531 noise = lfilter(b, a, windowed) 532 # Calculate THD+N. 533 THDN = rms(noise) / rms(windowed) 534 return THDN 535 536 537def max_THDN(signal, rate, step_size, window_size, q, freq): 538 """Analyze signal with moving window and find maximum THD+N value. 539 Args: 540 signal: array representing the signal 541 rate: sample rate of the signal. 542 step_size: how many samples to move the window by for each analysis. 543 window_size: how many samples to analyze each time. 544 q: quality factor for the notch filter. 545 freq: fundamental frequency of the signal. All other frequencies 546 are noise. If not specified, will be calculated using FFT. 547 Returns: 548 greatest_THDN: the greatest THD+N value found across all windows 549 """ 550 greatest_THDN = 0 551 cur = 0 552 while cur + window_size < len(signal): 553 window = signal[cur:cur + window_size] 554 res = THDN(window, rate, q, freq) 555 cur += step_size 556 if res > greatest_THDN: 557 greatest_THDN = res 558 return greatest_THDN 559 560 561def get_file_THDN(filename, q, freq=None): 562 """Get THD+N values for each channel of an audio file. 563 564 Args: 565 filename (str): path to the audio file. 566 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 567 q (float): quality factor for the notch filter. 568 freq (int|float): fundamental frequency of the signal. All other 569 frequencies are noise. If None, will be calculated with FFT. 570 Returns: 571 channel_results (list): THD+N value for each channel's signal. 572 List index corresponds to channel index. 573 """ 574 audio_file = soundfile.SoundFile(filename) 575 channel_results = [] 576 if audio_file.channels == 1: 577 channel_results.append(THDN(signal=audio_file.read(), 578 rate=audio_file.samplerate, 579 q=q, 580 freq=freq)) 581 else: 582 for ch_no, channel in enumerate(audio_file.read().transpose()): 583 channel_results.append(THDN(signal=channel, 584 rate=audio_file.samplerate, 585 q=q, 586 freq=freq)) 587 return channel_results 588 589 590def get_file_max_THDN(filename, step_size, window_size, q, freq=None): 591 """Get max THD+N value across analysis windows for each channel of file. 592 593 Args: 594 filename (str): path to the audio file. 595 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 596 step_size: how many samples to move the window by for each analysis. 597 window_size: how many samples to analyze each time. 598 q (float): quality factor for the notch filter. 599 freq (int|float): fundamental frequency of the signal. All other 600 frequencies are noise. If None, will be calculated with FFT. 601 Returns: 602 channel_results (list): max THD+N value for each channel's signal. 603 List index corresponds to channel index. 604 """ 605 audio_file = soundfile.SoundFile(filename) 606 channel_results = [] 607 if audio_file.channels == 1: 608 channel_results.append(max_THDN(signal=audio_file.read(), 609 rate=audio_file.samplerate, 610 step_size=step_size, 611 window_size=window_size, 612 q=q, 613 freq=freq)) 614 else: 615 for ch_no, channel in enumerate(audio_file.read().transpose()): 616 channel_results.append(max_THDN(signal=channel, 617 rate=audio_file.samplerate, 618 step_size=step_size, 619 window_size=window_size, 620 q=q, 621 freq=freq)) 622 return channel_results 623 624 625def get_file_anomaly_durations(filename, freq=None, 626 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 627 threshold=PATTERN_MATCHING_THRESHOLD, 628 tolerance=ANOMALY_GROUPING_TOLERANCE): 629 """Get durations of anomalies for each channel of audio file. 630 631 Args: 632 filename (str): path to the audio file. 633 (supported file types: http://www.mega-nerd.com/libsndfile/#Features) 634 freq (int|float): fundamental frequency of the signal. All other 635 frequencies are noise. If None, will be calculated with FFT. 636 block_size (int): The block size in samples to detect anomaly. 637 threshold (float): The threshold of correlation index to be judge as 638 matched. 639 tolerance (float): The number of samples greater than block_size / 2 640 that the sample distance between two anomaly time values can be and 641 still be grouped as the same anomaly. 642 Returns: 643 channel_results (list): anomaly durations for each channel's signal. 644 List index corresponds to channel index. 645 """ 646 audio_file = soundfile.SoundFile(filename) 647 signal = audio_file.read() 648 freq = freq or fundamental_freq(signal, audio_file.samplerate) 649 channel_results = [] 650 if audio_file.channels == 1: 651 channel_results.append(get_anomaly_durations( 652 signal=signal, 653 rate=audio_file.samplerate, 654 freq=freq, 655 block_size=block_size, 656 threshold=threshold, 657 tolerance=tolerance)) 658 else: 659 for ch_no, channel in enumerate(signal.transpose()): 660 channel_results.append(get_anomaly_durations( 661 signal=channel, 662 rate=audio_file.samplerate, 663 freq=freq, 664 block_size=block_size, 665 threshold=threshold, 666 tolerance=tolerance)) 667 return channel_results 668